##// END OF EJS Templates
Merge pull request #1391 from minrk/no_engines...
Min RK -
r6104:821fac24 merge
parent child Browse files
Show More
@@ -1,1448 +1,1452 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 import time
21 import time
22 import warnings
22 import warnings
23 from datetime import datetime
23 from datetime import datetime
24 from getpass import getpass
24 from getpass import getpass
25 from pprint import pprint
25 from pprint import pprint
26
26
27 pjoin = os.path.join
27 pjoin = os.path.join
28
28
29 import zmq
29 import zmq
30 # from zmq.eventloop import ioloop, zmqstream
30 # from zmq.eventloop import ioloop, zmqstream
31
31
32 from IPython.config.configurable import MultipleInstanceError
32 from IPython.config.configurable import MultipleInstanceError
33 from IPython.core.application import BaseIPythonApplication
33 from IPython.core.application import BaseIPythonApplication
34
34
35 from IPython.utils.jsonutil import rekey
35 from IPython.utils.jsonutil import rekey
36 from IPython.utils.localinterfaces import LOCAL_IPS
36 from IPython.utils.localinterfaces import LOCAL_IPS
37 from IPython.utils.path import get_ipython_dir
37 from IPython.utils.path import get_ipython_dir
38 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
38 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
39 Dict, List, Bool, Set)
39 Dict, List, Bool, Set)
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.external.ssh import tunnel
41 from IPython.external.ssh import tunnel
42
42
43 from IPython.parallel import Reference
43 from IPython.parallel import Reference
44 from IPython.parallel import error
44 from IPython.parallel import error
45 from IPython.parallel import util
45 from IPython.parallel import util
46
46
47 from IPython.zmq.session import Session, Message
47 from IPython.zmq.session import Session, Message
48
48
49 from .asyncresult import AsyncResult, AsyncHubResult
49 from .asyncresult import AsyncResult, AsyncHubResult
50 from IPython.core.profiledir import ProfileDir, ProfileDirError
50 from IPython.core.profiledir import ProfileDir, ProfileDirError
51 from .view import DirectView, LoadBalancedView
51 from .view import DirectView, LoadBalancedView
52
52
53 if sys.version_info[0] >= 3:
53 if sys.version_info[0] >= 3:
54 # xrange is used in a couple 'isinstance' tests in py2
54 # xrange is used in a couple 'isinstance' tests in py2
55 # should be just 'range' in 3k
55 # should be just 'range' in 3k
56 xrange = range
56 xrange = range
57
57
58 #--------------------------------------------------------------------------
58 #--------------------------------------------------------------------------
59 # Decorators for Client methods
59 # Decorators for Client methods
60 #--------------------------------------------------------------------------
60 #--------------------------------------------------------------------------
61
61
62 @decorator
62 @decorator
63 def spin_first(f, self, *args, **kwargs):
63 def spin_first(f, self, *args, **kwargs):
64 """Call spin() to sync state prior to calling the method."""
64 """Call spin() to sync state prior to calling the method."""
65 self.spin()
65 self.spin()
66 return f(self, *args, **kwargs)
66 return f(self, *args, **kwargs)
67
67
68
68
69 #--------------------------------------------------------------------------
69 #--------------------------------------------------------------------------
70 # Classes
70 # Classes
71 #--------------------------------------------------------------------------
71 #--------------------------------------------------------------------------
72
72
73 class Metadata(dict):
73 class Metadata(dict):
74 """Subclass of dict for initializing metadata values.
74 """Subclass of dict for initializing metadata values.
75
75
76 Attribute access works on keys.
76 Attribute access works on keys.
77
77
78 These objects have a strict set of keys - errors will raise if you try
78 These objects have a strict set of keys - errors will raise if you try
79 to add new keys.
79 to add new keys.
80 """
80 """
81 def __init__(self, *args, **kwargs):
81 def __init__(self, *args, **kwargs):
82 dict.__init__(self)
82 dict.__init__(self)
83 md = {'msg_id' : None,
83 md = {'msg_id' : None,
84 'submitted' : None,
84 'submitted' : None,
85 'started' : None,
85 'started' : None,
86 'completed' : None,
86 'completed' : None,
87 'received' : None,
87 'received' : None,
88 'engine_uuid' : None,
88 'engine_uuid' : None,
89 'engine_id' : None,
89 'engine_id' : None,
90 'follow' : None,
90 'follow' : None,
91 'after' : None,
91 'after' : None,
92 'status' : None,
92 'status' : None,
93
93
94 'pyin' : None,
94 'pyin' : None,
95 'pyout' : None,
95 'pyout' : None,
96 'pyerr' : None,
96 'pyerr' : None,
97 'stdout' : '',
97 'stdout' : '',
98 'stderr' : '',
98 'stderr' : '',
99 }
99 }
100 self.update(md)
100 self.update(md)
101 self.update(dict(*args, **kwargs))
101 self.update(dict(*args, **kwargs))
102
102
103 def __getattr__(self, key):
103 def __getattr__(self, key):
104 """getattr aliased to getitem"""
104 """getattr aliased to getitem"""
105 if key in self.iterkeys():
105 if key in self.iterkeys():
106 return self[key]
106 return self[key]
107 else:
107 else:
108 raise AttributeError(key)
108 raise AttributeError(key)
109
109
110 def __setattr__(self, key, value):
110 def __setattr__(self, key, value):
111 """setattr aliased to setitem, with strict"""
111 """setattr aliased to setitem, with strict"""
112 if key in self.iterkeys():
112 if key in self.iterkeys():
113 self[key] = value
113 self[key] = value
114 else:
114 else:
115 raise AttributeError(key)
115 raise AttributeError(key)
116
116
117 def __setitem__(self, key, value):
117 def __setitem__(self, key, value):
118 """strict static key enforcement"""
118 """strict static key enforcement"""
119 if key in self.iterkeys():
119 if key in self.iterkeys():
120 dict.__setitem__(self, key, value)
120 dict.__setitem__(self, key, value)
121 else:
121 else:
122 raise KeyError(key)
122 raise KeyError(key)
123
123
124
124
125 class Client(HasTraits):
125 class Client(HasTraits):
126 """A semi-synchronous client to the IPython ZMQ cluster
126 """A semi-synchronous client to the IPython ZMQ cluster
127
127
128 Parameters
128 Parameters
129 ----------
129 ----------
130
130
131 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
131 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
132 Connection information for the Hub's registration. If a json connector
132 Connection information for the Hub's registration. If a json connector
133 file is given, then likely no further configuration is necessary.
133 file is given, then likely no further configuration is necessary.
134 [Default: use profile]
134 [Default: use profile]
135 profile : bytes
135 profile : bytes
136 The name of the Cluster profile to be used to find connector information.
136 The name of the Cluster profile to be used to find connector information.
137 If run from an IPython application, the default profile will be the same
137 If run from an IPython application, the default profile will be the same
138 as the running application, otherwise it will be 'default'.
138 as the running application, otherwise it will be 'default'.
139 context : zmq.Context
139 context : zmq.Context
140 Pass an existing zmq.Context instance, otherwise the client will create its own.
140 Pass an existing zmq.Context instance, otherwise the client will create its own.
141 debug : bool
141 debug : bool
142 flag for lots of message printing for debug purposes
142 flag for lots of message printing for debug purposes
143 timeout : int/float
143 timeout : int/float
144 time (in seconds) to wait for connection replies from the Hub
144 time (in seconds) to wait for connection replies from the Hub
145 [Default: 10]
145 [Default: 10]
146
146
147 #-------------- session related args ----------------
147 #-------------- session related args ----------------
148
148
149 config : Config object
149 config : Config object
150 If specified, this will be relayed to the Session for configuration
150 If specified, this will be relayed to the Session for configuration
151 username : str
151 username : str
152 set username for the session object
152 set username for the session object
153 packer : str (import_string) or callable
153 packer : str (import_string) or callable
154 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
154 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
155 function to serialize messages. Must support same input as
155 function to serialize messages. Must support same input as
156 JSON, and output must be bytes.
156 JSON, and output must be bytes.
157 You can pass a callable directly as `pack`
157 You can pass a callable directly as `pack`
158 unpacker : str (import_string) or callable
158 unpacker : str (import_string) or callable
159 The inverse of packer. Only necessary if packer is specified as *not* one
159 The inverse of packer. Only necessary if packer is specified as *not* one
160 of 'json' or 'pickle'.
160 of 'json' or 'pickle'.
161
161
162 #-------------- ssh related args ----------------
162 #-------------- ssh related args ----------------
163 # These are args for configuring the ssh tunnel to be used
163 # These are args for configuring the ssh tunnel to be used
164 # credentials are used to forward connections over ssh to the Controller
164 # credentials are used to forward connections over ssh to the Controller
165 # Note that the ip given in `addr` needs to be relative to sshserver
165 # Note that the ip given in `addr` needs to be relative to sshserver
166 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
166 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
167 # and set sshserver as the same machine the Controller is on. However,
167 # and set sshserver as the same machine the Controller is on. However,
168 # the only requirement is that sshserver is able to see the Controller
168 # the only requirement is that sshserver is able to see the Controller
169 # (i.e. is within the same trusted network).
169 # (i.e. is within the same trusted network).
170
170
171 sshserver : str
171 sshserver : str
172 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
172 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
173 If keyfile or password is specified, and this is not, it will default to
173 If keyfile or password is specified, and this is not, it will default to
174 the ip given in addr.
174 the ip given in addr.
175 sshkey : str; path to ssh private key file
175 sshkey : str; path to ssh private key file
176 This specifies a key to be used in ssh login, default None.
176 This specifies a key to be used in ssh login, default None.
177 Regular default ssh keys will be used without specifying this argument.
177 Regular default ssh keys will be used without specifying this argument.
178 password : str
178 password : str
179 Your ssh password to sshserver. Note that if this is left None,
179 Your ssh password to sshserver. Note that if this is left None,
180 you will be prompted for it if passwordless key based login is unavailable.
180 you will be prompted for it if passwordless key based login is unavailable.
181 paramiko : bool
181 paramiko : bool
182 flag for whether to use paramiko instead of shell ssh for tunneling.
182 flag for whether to use paramiko instead of shell ssh for tunneling.
183 [default: True on win32, False else]
183 [default: True on win32, False else]
184
184
185 ------- exec authentication args -------
185 ------- exec authentication args -------
186 If even localhost is untrusted, you can have some protection against
186 If even localhost is untrusted, you can have some protection against
187 unauthorized execution by signing messages with HMAC digests.
187 unauthorized execution by signing messages with HMAC digests.
188 Messages are still sent as cleartext, so if someone can snoop your
188 Messages are still sent as cleartext, so if someone can snoop your
189 loopback traffic this will not protect your privacy, but will prevent
189 loopback traffic this will not protect your privacy, but will prevent
190 unauthorized execution.
190 unauthorized execution.
191
191
192 exec_key : str
192 exec_key : str
193 an authentication key or file containing a key
193 an authentication key or file containing a key
194 default: None
194 default: None
195
195
196
196
197 Attributes
197 Attributes
198 ----------
198 ----------
199
199
200 ids : list of int engine IDs
200 ids : list of int engine IDs
201 requesting the ids attribute always synchronizes
201 requesting the ids attribute always synchronizes
202 the registration state. To request ids without synchronization,
202 the registration state. To request ids without synchronization,
203 use semi-private _ids attributes.
203 use semi-private _ids attributes.
204
204
205 history : list of msg_ids
205 history : list of msg_ids
206 a list of msg_ids, keeping track of all the execution
206 a list of msg_ids, keeping track of all the execution
207 messages you have submitted in order.
207 messages you have submitted in order.
208
208
209 outstanding : set of msg_ids
209 outstanding : set of msg_ids
210 a set of msg_ids that have been submitted, but whose
210 a set of msg_ids that have been submitted, but whose
211 results have not yet been received.
211 results have not yet been received.
212
212
213 results : dict
213 results : dict
214 a dict of all our results, keyed by msg_id
214 a dict of all our results, keyed by msg_id
215
215
216 block : bool
216 block : bool
217 determines default behavior when block not specified
217 determines default behavior when block not specified
218 in execution methods
218 in execution methods
219
219
220 Methods
220 Methods
221 -------
221 -------
222
222
223 spin
223 spin
224 flushes incoming results and registration state changes
224 flushes incoming results and registration state changes
225 control methods spin, and requesting `ids` also ensures up to date
225 control methods spin, and requesting `ids` also ensures up to date
226
226
227 wait
227 wait
228 wait on one or more msg_ids
228 wait on one or more msg_ids
229
229
230 execution methods
230 execution methods
231 apply
231 apply
232 legacy: execute, run
232 legacy: execute, run
233
233
234 data movement
234 data movement
235 push, pull, scatter, gather
235 push, pull, scatter, gather
236
236
237 query methods
237 query methods
238 queue_status, get_result, purge, result_status
238 queue_status, get_result, purge, result_status
239
239
240 control methods
240 control methods
241 abort, shutdown
241 abort, shutdown
242
242
243 """
243 """
244
244
245
245
246 block = Bool(False)
246 block = Bool(False)
247 outstanding = Set()
247 outstanding = Set()
248 results = Instance('collections.defaultdict', (dict,))
248 results = Instance('collections.defaultdict', (dict,))
249 metadata = Instance('collections.defaultdict', (Metadata,))
249 metadata = Instance('collections.defaultdict', (Metadata,))
250 history = List()
250 history = List()
251 debug = Bool(False)
251 debug = Bool(False)
252
252
253 profile=Unicode()
253 profile=Unicode()
254 def _profile_default(self):
254 def _profile_default(self):
255 if BaseIPythonApplication.initialized():
255 if BaseIPythonApplication.initialized():
256 # an IPython app *might* be running, try to get its profile
256 # an IPython app *might* be running, try to get its profile
257 try:
257 try:
258 return BaseIPythonApplication.instance().profile
258 return BaseIPythonApplication.instance().profile
259 except (AttributeError, MultipleInstanceError):
259 except (AttributeError, MultipleInstanceError):
260 # could be a *different* subclass of config.Application,
260 # could be a *different* subclass of config.Application,
261 # which would raise one of these two errors.
261 # which would raise one of these two errors.
262 return u'default'
262 return u'default'
263 else:
263 else:
264 return u'default'
264 return u'default'
265
265
266
266
267 _outstanding_dict = Instance('collections.defaultdict', (set,))
267 _outstanding_dict = Instance('collections.defaultdict', (set,))
268 _ids = List()
268 _ids = List()
269 _connected=Bool(False)
269 _connected=Bool(False)
270 _ssh=Bool(False)
270 _ssh=Bool(False)
271 _context = Instance('zmq.Context')
271 _context = Instance('zmq.Context')
272 _config = Dict()
272 _config = Dict()
273 _engines=Instance(util.ReverseDict, (), {})
273 _engines=Instance(util.ReverseDict, (), {})
274 # _hub_socket=Instance('zmq.Socket')
274 # _hub_socket=Instance('zmq.Socket')
275 _query_socket=Instance('zmq.Socket')
275 _query_socket=Instance('zmq.Socket')
276 _control_socket=Instance('zmq.Socket')
276 _control_socket=Instance('zmq.Socket')
277 _iopub_socket=Instance('zmq.Socket')
277 _iopub_socket=Instance('zmq.Socket')
278 _notification_socket=Instance('zmq.Socket')
278 _notification_socket=Instance('zmq.Socket')
279 _mux_socket=Instance('zmq.Socket')
279 _mux_socket=Instance('zmq.Socket')
280 _task_socket=Instance('zmq.Socket')
280 _task_socket=Instance('zmq.Socket')
281 _task_scheme=Unicode()
281 _task_scheme=Unicode()
282 _closed = False
282 _closed = False
283 _ignored_control_replies=Integer(0)
283 _ignored_control_replies=Integer(0)
284 _ignored_hub_replies=Integer(0)
284 _ignored_hub_replies=Integer(0)
285
285
286 def __new__(self, *args, **kw):
286 def __new__(self, *args, **kw):
287 # don't raise on positional args
287 # don't raise on positional args
288 return HasTraits.__new__(self, **kw)
288 return HasTraits.__new__(self, **kw)
289
289
290 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
290 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
291 context=None, debug=False, exec_key=None,
291 context=None, debug=False, exec_key=None,
292 sshserver=None, sshkey=None, password=None, paramiko=None,
292 sshserver=None, sshkey=None, password=None, paramiko=None,
293 timeout=10, **extra_args
293 timeout=10, **extra_args
294 ):
294 ):
295 if profile:
295 if profile:
296 super(Client, self).__init__(debug=debug, profile=profile)
296 super(Client, self).__init__(debug=debug, profile=profile)
297 else:
297 else:
298 super(Client, self).__init__(debug=debug)
298 super(Client, self).__init__(debug=debug)
299 if context is None:
299 if context is None:
300 context = zmq.Context.instance()
300 context = zmq.Context.instance()
301 self._context = context
301 self._context = context
302
302
303 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
303 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
304 if self._cd is not None:
304 if self._cd is not None:
305 if url_or_file is None:
305 if url_or_file is None:
306 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
306 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
307 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
307 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
308 " Please specify at least one of url_or_file or profile."
308 " Please specify at least one of url_or_file or profile."
309
309
310 if not util.is_url(url_or_file):
310 if not util.is_url(url_or_file):
311 # it's not a url, try for a file
311 # it's not a url, try for a file
312 if not os.path.exists(url_or_file):
312 if not os.path.exists(url_or_file):
313 if self._cd:
313 if self._cd:
314 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
314 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
315 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
315 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
316 with open(url_or_file) as f:
316 with open(url_or_file) as f:
317 cfg = json.loads(f.read())
317 cfg = json.loads(f.read())
318 else:
318 else:
319 cfg = {'url':url_or_file}
319 cfg = {'url':url_or_file}
320
320
321 # sync defaults from args, json:
321 # sync defaults from args, json:
322 if sshserver:
322 if sshserver:
323 cfg['ssh'] = sshserver
323 cfg['ssh'] = sshserver
324 if exec_key:
324 if exec_key:
325 cfg['exec_key'] = exec_key
325 cfg['exec_key'] = exec_key
326 exec_key = cfg['exec_key']
326 exec_key = cfg['exec_key']
327 location = cfg.setdefault('location', None)
327 location = cfg.setdefault('location', None)
328 cfg['url'] = util.disambiguate_url(cfg['url'], location)
328 cfg['url'] = util.disambiguate_url(cfg['url'], location)
329 url = cfg['url']
329 url = cfg['url']
330 proto,addr,port = util.split_url(url)
330 proto,addr,port = util.split_url(url)
331 if location is not None and addr == '127.0.0.1':
331 if location is not None and addr == '127.0.0.1':
332 # location specified, and connection is expected to be local
332 # location specified, and connection is expected to be local
333 if location not in LOCAL_IPS and not sshserver:
333 if location not in LOCAL_IPS and not sshserver:
334 # load ssh from JSON *only* if the controller is not on
334 # load ssh from JSON *only* if the controller is not on
335 # this machine
335 # this machine
336 sshserver=cfg['ssh']
336 sshserver=cfg['ssh']
337 if location not in LOCAL_IPS and not sshserver:
337 if location not in LOCAL_IPS and not sshserver:
338 # warn if no ssh specified, but SSH is probably needed
338 # warn if no ssh specified, but SSH is probably needed
339 # This is only a warning, because the most likely cause
339 # This is only a warning, because the most likely cause
340 # is a local Controller on a laptop whose IP is dynamic
340 # is a local Controller on a laptop whose IP is dynamic
341 warnings.warn("""
341 warnings.warn("""
342 Controller appears to be listening on localhost, but not on this machine.
342 Controller appears to be listening on localhost, but not on this machine.
343 If this is true, you should specify Client(...,sshserver='you@%s')
343 If this is true, you should specify Client(...,sshserver='you@%s')
344 or instruct your controller to listen on an external IP."""%location,
344 or instruct your controller to listen on an external IP."""%location,
345 RuntimeWarning)
345 RuntimeWarning)
346 elif not sshserver:
346 elif not sshserver:
347 # otherwise sync with cfg
347 # otherwise sync with cfg
348 sshserver = cfg['ssh']
348 sshserver = cfg['ssh']
349
349
350 self._config = cfg
350 self._config = cfg
351
351
352 self._ssh = bool(sshserver or sshkey or password)
352 self._ssh = bool(sshserver or sshkey or password)
353 if self._ssh and sshserver is None:
353 if self._ssh and sshserver is None:
354 # default to ssh via localhost
354 # default to ssh via localhost
355 sshserver = url.split('://')[1].split(':')[0]
355 sshserver = url.split('://')[1].split(':')[0]
356 if self._ssh and password is None:
356 if self._ssh and password is None:
357 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
357 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
358 password=False
358 password=False
359 else:
359 else:
360 password = getpass("SSH Password for %s: "%sshserver)
360 password = getpass("SSH Password for %s: "%sshserver)
361 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
361 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
362
362
363 # configure and construct the session
363 # configure and construct the session
364 if exec_key is not None:
364 if exec_key is not None:
365 if os.path.isfile(exec_key):
365 if os.path.isfile(exec_key):
366 extra_args['keyfile'] = exec_key
366 extra_args['keyfile'] = exec_key
367 else:
367 else:
368 exec_key = util.asbytes(exec_key)
368 exec_key = util.asbytes(exec_key)
369 extra_args['key'] = exec_key
369 extra_args['key'] = exec_key
370 self.session = Session(**extra_args)
370 self.session = Session(**extra_args)
371
371
372 self._query_socket = self._context.socket(zmq.DEALER)
372 self._query_socket = self._context.socket(zmq.DEALER)
373 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
373 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
374 if self._ssh:
374 if self._ssh:
375 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
375 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
376 else:
376 else:
377 self._query_socket.connect(url)
377 self._query_socket.connect(url)
378
378
379 self.session.debug = self.debug
379 self.session.debug = self.debug
380
380
381 self._notification_handlers = {'registration_notification' : self._register_engine,
381 self._notification_handlers = {'registration_notification' : self._register_engine,
382 'unregistration_notification' : self._unregister_engine,
382 'unregistration_notification' : self._unregister_engine,
383 'shutdown_notification' : lambda msg: self.close(),
383 'shutdown_notification' : lambda msg: self.close(),
384 }
384 }
385 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
385 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
386 'apply_reply' : self._handle_apply_reply}
386 'apply_reply' : self._handle_apply_reply}
387 self._connect(sshserver, ssh_kwargs, timeout)
387 self._connect(sshserver, ssh_kwargs, timeout)
388
388
389 def __del__(self):
389 def __del__(self):
390 """cleanup sockets, but _not_ context."""
390 """cleanup sockets, but _not_ context."""
391 self.close()
391 self.close()
392
392
393 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
393 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
394 if ipython_dir is None:
394 if ipython_dir is None:
395 ipython_dir = get_ipython_dir()
395 ipython_dir = get_ipython_dir()
396 if profile_dir is not None:
396 if profile_dir is not None:
397 try:
397 try:
398 self._cd = ProfileDir.find_profile_dir(profile_dir)
398 self._cd = ProfileDir.find_profile_dir(profile_dir)
399 return
399 return
400 except ProfileDirError:
400 except ProfileDirError:
401 pass
401 pass
402 elif profile is not None:
402 elif profile is not None:
403 try:
403 try:
404 self._cd = ProfileDir.find_profile_dir_by_name(
404 self._cd = ProfileDir.find_profile_dir_by_name(
405 ipython_dir, profile)
405 ipython_dir, profile)
406 return
406 return
407 except ProfileDirError:
407 except ProfileDirError:
408 pass
408 pass
409 self._cd = None
409 self._cd = None
410
410
411 def _update_engines(self, engines):
411 def _update_engines(self, engines):
412 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
412 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
413 for k,v in engines.iteritems():
413 for k,v in engines.iteritems():
414 eid = int(k)
414 eid = int(k)
415 self._engines[eid] = v
415 self._engines[eid] = v
416 self._ids.append(eid)
416 self._ids.append(eid)
417 self._ids = sorted(self._ids)
417 self._ids = sorted(self._ids)
418 if sorted(self._engines.keys()) != range(len(self._engines)) and \
418 if sorted(self._engines.keys()) != range(len(self._engines)) and \
419 self._task_scheme == 'pure' and self._task_socket:
419 self._task_scheme == 'pure' and self._task_socket:
420 self._stop_scheduling_tasks()
420 self._stop_scheduling_tasks()
421
421
422 def _stop_scheduling_tasks(self):
422 def _stop_scheduling_tasks(self):
423 """Stop scheduling tasks because an engine has been unregistered
423 """Stop scheduling tasks because an engine has been unregistered
424 from a pure ZMQ scheduler.
424 from a pure ZMQ scheduler.
425 """
425 """
426 self._task_socket.close()
426 self._task_socket.close()
427 self._task_socket = None
427 self._task_socket = None
428 msg = "An engine has been unregistered, and we are using pure " +\
428 msg = "An engine has been unregistered, and we are using pure " +\
429 "ZMQ task scheduling. Task farming will be disabled."
429 "ZMQ task scheduling. Task farming will be disabled."
430 if self.outstanding:
430 if self.outstanding:
431 msg += " If you were running tasks when this happened, " +\
431 msg += " If you were running tasks when this happened, " +\
432 "some `outstanding` msg_ids may never resolve."
432 "some `outstanding` msg_ids may never resolve."
433 warnings.warn(msg, RuntimeWarning)
433 warnings.warn(msg, RuntimeWarning)
434
434
435 def _build_targets(self, targets):
435 def _build_targets(self, targets):
436 """Turn valid target IDs or 'all' into two lists:
436 """Turn valid target IDs or 'all' into two lists:
437 (int_ids, uuids).
437 (int_ids, uuids).
438 """
438 """
439 if not self._ids:
439 if not self._ids:
440 # flush notification socket if no engines yet, just in case
440 # flush notification socket if no engines yet, just in case
441 if not self.ids:
441 if not self.ids:
442 raise error.NoEnginesRegistered("Can't build targets without any engines")
442 raise error.NoEnginesRegistered("Can't build targets without any engines")
443
443
444 if targets is None:
444 if targets is None:
445 targets = self._ids
445 targets = self._ids
446 elif isinstance(targets, basestring):
446 elif isinstance(targets, basestring):
447 if targets.lower() == 'all':
447 if targets.lower() == 'all':
448 targets = self._ids
448 targets = self._ids
449 else:
449 else:
450 raise TypeError("%r not valid str target, must be 'all'"%(targets))
450 raise TypeError("%r not valid str target, must be 'all'"%(targets))
451 elif isinstance(targets, int):
451 elif isinstance(targets, int):
452 if targets < 0:
452 if targets < 0:
453 targets = self.ids[targets]
453 targets = self.ids[targets]
454 if targets not in self._ids:
454 if targets not in self._ids:
455 raise IndexError("No such engine: %i"%targets)
455 raise IndexError("No such engine: %i"%targets)
456 targets = [targets]
456 targets = [targets]
457
457
458 if isinstance(targets, slice):
458 if isinstance(targets, slice):
459 indices = range(len(self._ids))[targets]
459 indices = range(len(self._ids))[targets]
460 ids = self.ids
460 ids = self.ids
461 targets = [ ids[i] for i in indices ]
461 targets = [ ids[i] for i in indices ]
462
462
463 if not isinstance(targets, (tuple, list, xrange)):
463 if not isinstance(targets, (tuple, list, xrange)):
464 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
464 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
465
465
466 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
466 return [util.asbytes(self._engines[t]) for t in targets], list(targets)
467
467
468 def _connect(self, sshserver, ssh_kwargs, timeout):
468 def _connect(self, sshserver, ssh_kwargs, timeout):
469 """setup all our socket connections to the cluster. This is called from
469 """setup all our socket connections to the cluster. This is called from
470 __init__."""
470 __init__."""
471
471
472 # Maybe allow reconnecting?
472 # Maybe allow reconnecting?
473 if self._connected:
473 if self._connected:
474 return
474 return
475 self._connected=True
475 self._connected=True
476
476
477 def connect_socket(s, url):
477 def connect_socket(s, url):
478 url = util.disambiguate_url(url, self._config['location'])
478 url = util.disambiguate_url(url, self._config['location'])
479 if self._ssh:
479 if self._ssh:
480 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
480 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
481 else:
481 else:
482 return s.connect(url)
482 return s.connect(url)
483
483
484 self.session.send(self._query_socket, 'connection_request')
484 self.session.send(self._query_socket, 'connection_request')
485 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
485 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
486 poller = zmq.Poller()
486 poller = zmq.Poller()
487 poller.register(self._query_socket, zmq.POLLIN)
487 poller.register(self._query_socket, zmq.POLLIN)
488 # poll expects milliseconds, timeout is seconds
488 # poll expects milliseconds, timeout is seconds
489 evts = poller.poll(timeout*1000)
489 evts = poller.poll(timeout*1000)
490 if not evts:
490 if not evts:
491 raise error.TimeoutError("Hub connection request timed out")
491 raise error.TimeoutError("Hub connection request timed out")
492 idents,msg = self.session.recv(self._query_socket,mode=0)
492 idents,msg = self.session.recv(self._query_socket,mode=0)
493 if self.debug:
493 if self.debug:
494 pprint(msg)
494 pprint(msg)
495 msg = Message(msg)
495 msg = Message(msg)
496 content = msg.content
496 content = msg.content
497 self._config['registration'] = dict(content)
497 self._config['registration'] = dict(content)
498 if content.status == 'ok':
498 if content.status == 'ok':
499 ident = self.session.bsession
499 ident = self.session.bsession
500 if content.mux:
500 if content.mux:
501 self._mux_socket = self._context.socket(zmq.DEALER)
501 self._mux_socket = self._context.socket(zmq.DEALER)
502 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
502 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
503 connect_socket(self._mux_socket, content.mux)
503 connect_socket(self._mux_socket, content.mux)
504 if content.task:
504 if content.task:
505 self._task_scheme, task_addr = content.task
505 self._task_scheme, task_addr = content.task
506 self._task_socket = self._context.socket(zmq.DEALER)
506 self._task_socket = self._context.socket(zmq.DEALER)
507 self._task_socket.setsockopt(zmq.IDENTITY, ident)
507 self._task_socket.setsockopt(zmq.IDENTITY, ident)
508 connect_socket(self._task_socket, task_addr)
508 connect_socket(self._task_socket, task_addr)
509 if content.notification:
509 if content.notification:
510 self._notification_socket = self._context.socket(zmq.SUB)
510 self._notification_socket = self._context.socket(zmq.SUB)
511 connect_socket(self._notification_socket, content.notification)
511 connect_socket(self._notification_socket, content.notification)
512 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
512 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
513 # if content.query:
513 # if content.query:
514 # self._query_socket = self._context.socket(zmq.DEALER)
514 # self._query_socket = self._context.socket(zmq.DEALER)
515 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
515 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
516 # connect_socket(self._query_socket, content.query)
516 # connect_socket(self._query_socket, content.query)
517 if content.control:
517 if content.control:
518 self._control_socket = self._context.socket(zmq.DEALER)
518 self._control_socket = self._context.socket(zmq.DEALER)
519 self._control_socket.setsockopt(zmq.IDENTITY, ident)
519 self._control_socket.setsockopt(zmq.IDENTITY, ident)
520 connect_socket(self._control_socket, content.control)
520 connect_socket(self._control_socket, content.control)
521 if content.iopub:
521 if content.iopub:
522 self._iopub_socket = self._context.socket(zmq.SUB)
522 self._iopub_socket = self._context.socket(zmq.SUB)
523 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
523 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
524 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
524 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
525 connect_socket(self._iopub_socket, content.iopub)
525 connect_socket(self._iopub_socket, content.iopub)
526 self._update_engines(dict(content.engines))
526 self._update_engines(dict(content.engines))
527 else:
527 else:
528 self._connected = False
528 self._connected = False
529 raise Exception("Failed to connect!")
529 raise Exception("Failed to connect!")
530
530
531 #--------------------------------------------------------------------------
531 #--------------------------------------------------------------------------
532 # handlers and callbacks for incoming messages
532 # handlers and callbacks for incoming messages
533 #--------------------------------------------------------------------------
533 #--------------------------------------------------------------------------
534
534
535 def _unwrap_exception(self, content):
535 def _unwrap_exception(self, content):
536 """unwrap exception, and remap engine_id to int."""
536 """unwrap exception, and remap engine_id to int."""
537 e = error.unwrap_exception(content)
537 e = error.unwrap_exception(content)
538 # print e.traceback
538 # print e.traceback
539 if e.engine_info:
539 if e.engine_info:
540 e_uuid = e.engine_info['engine_uuid']
540 e_uuid = e.engine_info['engine_uuid']
541 eid = self._engines[e_uuid]
541 eid = self._engines[e_uuid]
542 e.engine_info['engine_id'] = eid
542 e.engine_info['engine_id'] = eid
543 return e
543 return e
544
544
545 def _extract_metadata(self, header, parent, content):
545 def _extract_metadata(self, header, parent, content):
546 md = {'msg_id' : parent['msg_id'],
546 md = {'msg_id' : parent['msg_id'],
547 'received' : datetime.now(),
547 'received' : datetime.now(),
548 'engine_uuid' : header.get('engine', None),
548 'engine_uuid' : header.get('engine', None),
549 'follow' : parent.get('follow', []),
549 'follow' : parent.get('follow', []),
550 'after' : parent.get('after', []),
550 'after' : parent.get('after', []),
551 'status' : content['status'],
551 'status' : content['status'],
552 }
552 }
553
553
554 if md['engine_uuid'] is not None:
554 if md['engine_uuid'] is not None:
555 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
555 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
556
556
557 if 'date' in parent:
557 if 'date' in parent:
558 md['submitted'] = parent['date']
558 md['submitted'] = parent['date']
559 if 'started' in header:
559 if 'started' in header:
560 md['started'] = header['started']
560 md['started'] = header['started']
561 if 'date' in header:
561 if 'date' in header:
562 md['completed'] = header['date']
562 md['completed'] = header['date']
563 return md
563 return md
564
564
565 def _register_engine(self, msg):
565 def _register_engine(self, msg):
566 """Register a new engine, and update our connection info."""
566 """Register a new engine, and update our connection info."""
567 content = msg['content']
567 content = msg['content']
568 eid = content['id']
568 eid = content['id']
569 d = {eid : content['queue']}
569 d = {eid : content['queue']}
570 self._update_engines(d)
570 self._update_engines(d)
571
571
572 def _unregister_engine(self, msg):
572 def _unregister_engine(self, msg):
573 """Unregister an engine that has died."""
573 """Unregister an engine that has died."""
574 content = msg['content']
574 content = msg['content']
575 eid = int(content['id'])
575 eid = int(content['id'])
576 if eid in self._ids:
576 if eid in self._ids:
577 self._ids.remove(eid)
577 self._ids.remove(eid)
578 uuid = self._engines.pop(eid)
578 uuid = self._engines.pop(eid)
579
579
580 self._handle_stranded_msgs(eid, uuid)
580 self._handle_stranded_msgs(eid, uuid)
581
581
582 if self._task_socket and self._task_scheme == 'pure':
582 if self._task_socket and self._task_scheme == 'pure':
583 self._stop_scheduling_tasks()
583 self._stop_scheduling_tasks()
584
584
585 def _handle_stranded_msgs(self, eid, uuid):
585 def _handle_stranded_msgs(self, eid, uuid):
586 """Handle messages known to be on an engine when the engine unregisters.
586 """Handle messages known to be on an engine when the engine unregisters.
587
587
588 It is possible that this will fire prematurely - that is, an engine will
588 It is possible that this will fire prematurely - that is, an engine will
589 go down after completing a result, and the client will be notified
589 go down after completing a result, and the client will be notified
590 of the unregistration and later receive the successful result.
590 of the unregistration and later receive the successful result.
591 """
591 """
592
592
593 outstanding = self._outstanding_dict[uuid]
593 outstanding = self._outstanding_dict[uuid]
594
594
595 for msg_id in list(outstanding):
595 for msg_id in list(outstanding):
596 if msg_id in self.results:
596 if msg_id in self.results:
597 # we already
597 # we already
598 continue
598 continue
599 try:
599 try:
600 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
600 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
601 except:
601 except:
602 content = error.wrap_exception()
602 content = error.wrap_exception()
603 # build a fake message:
603 # build a fake message:
604 parent = {}
604 parent = {}
605 header = {}
605 header = {}
606 parent['msg_id'] = msg_id
606 parent['msg_id'] = msg_id
607 header['engine'] = uuid
607 header['engine'] = uuid
608 header['date'] = datetime.now()
608 header['date'] = datetime.now()
609 msg = dict(parent_header=parent, header=header, content=content)
609 msg = dict(parent_header=parent, header=header, content=content)
610 self._handle_apply_reply(msg)
610 self._handle_apply_reply(msg)
611
611
612 def _handle_execute_reply(self, msg):
612 def _handle_execute_reply(self, msg):
613 """Save the reply to an execute_request into our results.
613 """Save the reply to an execute_request into our results.
614
614
615 execute messages are never actually used. apply is used instead.
615 execute messages are never actually used. apply is used instead.
616 """
616 """
617
617
618 parent = msg['parent_header']
618 parent = msg['parent_header']
619 msg_id = parent['msg_id']
619 msg_id = parent['msg_id']
620 if msg_id not in self.outstanding:
620 if msg_id not in self.outstanding:
621 if msg_id in self.history:
621 if msg_id in self.history:
622 print ("got stale result: %s"%msg_id)
622 print ("got stale result: %s"%msg_id)
623 else:
623 else:
624 print ("got unknown result: %s"%msg_id)
624 print ("got unknown result: %s"%msg_id)
625 else:
625 else:
626 self.outstanding.remove(msg_id)
626 self.outstanding.remove(msg_id)
627 self.results[msg_id] = self._unwrap_exception(msg['content'])
627 self.results[msg_id] = self._unwrap_exception(msg['content'])
628
628
629 def _handle_apply_reply(self, msg):
629 def _handle_apply_reply(self, msg):
630 """Save the reply to an apply_request into our results."""
630 """Save the reply to an apply_request into our results."""
631 parent = msg['parent_header']
631 parent = msg['parent_header']
632 msg_id = parent['msg_id']
632 msg_id = parent['msg_id']
633 if msg_id not in self.outstanding:
633 if msg_id not in self.outstanding:
634 if msg_id in self.history:
634 if msg_id in self.history:
635 print ("got stale result: %s"%msg_id)
635 print ("got stale result: %s"%msg_id)
636 print self.results[msg_id]
636 print self.results[msg_id]
637 print msg
637 print msg
638 else:
638 else:
639 print ("got unknown result: %s"%msg_id)
639 print ("got unknown result: %s"%msg_id)
640 else:
640 else:
641 self.outstanding.remove(msg_id)
641 self.outstanding.remove(msg_id)
642 content = msg['content']
642 content = msg['content']
643 header = msg['header']
643 header = msg['header']
644
644
645 # construct metadata:
645 # construct metadata:
646 md = self.metadata[msg_id]
646 md = self.metadata[msg_id]
647 md.update(self._extract_metadata(header, parent, content))
647 md.update(self._extract_metadata(header, parent, content))
648 # is this redundant?
648 # is this redundant?
649 self.metadata[msg_id] = md
649 self.metadata[msg_id] = md
650
650
651 e_outstanding = self._outstanding_dict[md['engine_uuid']]
651 e_outstanding = self._outstanding_dict[md['engine_uuid']]
652 if msg_id in e_outstanding:
652 if msg_id in e_outstanding:
653 e_outstanding.remove(msg_id)
653 e_outstanding.remove(msg_id)
654
654
655 # construct result:
655 # construct result:
656 if content['status'] == 'ok':
656 if content['status'] == 'ok':
657 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
657 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
658 elif content['status'] == 'aborted':
658 elif content['status'] == 'aborted':
659 self.results[msg_id] = error.TaskAborted(msg_id)
659 self.results[msg_id] = error.TaskAborted(msg_id)
660 elif content['status'] == 'resubmitted':
660 elif content['status'] == 'resubmitted':
661 # TODO: handle resubmission
661 # TODO: handle resubmission
662 pass
662 pass
663 else:
663 else:
664 self.results[msg_id] = self._unwrap_exception(content)
664 self.results[msg_id] = self._unwrap_exception(content)
665
665
666 def _flush_notifications(self):
666 def _flush_notifications(self):
667 """Flush notifications of engine registrations waiting
667 """Flush notifications of engine registrations waiting
668 in ZMQ queue."""
668 in ZMQ queue."""
669 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
669 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
670 while msg is not None:
670 while msg is not None:
671 if self.debug:
671 if self.debug:
672 pprint(msg)
672 pprint(msg)
673 msg_type = msg['header']['msg_type']
673 msg_type = msg['header']['msg_type']
674 handler = self._notification_handlers.get(msg_type, None)
674 handler = self._notification_handlers.get(msg_type, None)
675 if handler is None:
675 if handler is None:
676 raise Exception("Unhandled message type: %s"%msg.msg_type)
676 raise Exception("Unhandled message type: %s"%msg.msg_type)
677 else:
677 else:
678 handler(msg)
678 handler(msg)
679 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
679 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
680
680
681 def _flush_results(self, sock):
681 def _flush_results(self, sock):
682 """Flush task or queue results waiting in ZMQ queue."""
682 """Flush task or queue results waiting in ZMQ queue."""
683 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
683 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
684 while msg is not None:
684 while msg is not None:
685 if self.debug:
685 if self.debug:
686 pprint(msg)
686 pprint(msg)
687 msg_type = msg['header']['msg_type']
687 msg_type = msg['header']['msg_type']
688 handler = self._queue_handlers.get(msg_type, None)
688 handler = self._queue_handlers.get(msg_type, None)
689 if handler is None:
689 if handler is None:
690 raise Exception("Unhandled message type: %s"%msg.msg_type)
690 raise Exception("Unhandled message type: %s"%msg.msg_type)
691 else:
691 else:
692 handler(msg)
692 handler(msg)
693 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
693 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
694
694
695 def _flush_control(self, sock):
695 def _flush_control(self, sock):
696 """Flush replies from the control channel waiting
696 """Flush replies from the control channel waiting
697 in the ZMQ queue.
697 in the ZMQ queue.
698
698
699 Currently: ignore them."""
699 Currently: ignore them."""
700 if self._ignored_control_replies <= 0:
700 if self._ignored_control_replies <= 0:
701 return
701 return
702 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
702 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
703 while msg is not None:
703 while msg is not None:
704 self._ignored_control_replies -= 1
704 self._ignored_control_replies -= 1
705 if self.debug:
705 if self.debug:
706 pprint(msg)
706 pprint(msg)
707 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
707 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
708
708
709 def _flush_ignored_control(self):
709 def _flush_ignored_control(self):
710 """flush ignored control replies"""
710 """flush ignored control replies"""
711 while self._ignored_control_replies > 0:
711 while self._ignored_control_replies > 0:
712 self.session.recv(self._control_socket)
712 self.session.recv(self._control_socket)
713 self._ignored_control_replies -= 1
713 self._ignored_control_replies -= 1
714
714
715 def _flush_ignored_hub_replies(self):
715 def _flush_ignored_hub_replies(self):
716 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
716 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
717 while msg is not None:
717 while msg is not None:
718 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
718 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
719
719
720 def _flush_iopub(self, sock):
720 def _flush_iopub(self, sock):
721 """Flush replies from the iopub channel waiting
721 """Flush replies from the iopub channel waiting
722 in the ZMQ queue.
722 in the ZMQ queue.
723 """
723 """
724 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
724 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
725 while msg is not None:
725 while msg is not None:
726 if self.debug:
726 if self.debug:
727 pprint(msg)
727 pprint(msg)
728 parent = msg['parent_header']
728 parent = msg['parent_header']
729 # ignore IOPub messages with no parent.
729 # ignore IOPub messages with no parent.
730 # Caused by print statements or warnings from before the first execution.
730 # Caused by print statements or warnings from before the first execution.
731 if not parent:
731 if not parent:
732 continue
732 continue
733 msg_id = parent['msg_id']
733 msg_id = parent['msg_id']
734 content = msg['content']
734 content = msg['content']
735 header = msg['header']
735 header = msg['header']
736 msg_type = msg['header']['msg_type']
736 msg_type = msg['header']['msg_type']
737
737
738 # init metadata:
738 # init metadata:
739 md = self.metadata[msg_id]
739 md = self.metadata[msg_id]
740
740
741 if msg_type == 'stream':
741 if msg_type == 'stream':
742 name = content['name']
742 name = content['name']
743 s = md[name] or ''
743 s = md[name] or ''
744 md[name] = s + content['data']
744 md[name] = s + content['data']
745 elif msg_type == 'pyerr':
745 elif msg_type == 'pyerr':
746 md.update({'pyerr' : self._unwrap_exception(content)})
746 md.update({'pyerr' : self._unwrap_exception(content)})
747 elif msg_type == 'pyin':
747 elif msg_type == 'pyin':
748 md.update({'pyin' : content['code']})
748 md.update({'pyin' : content['code']})
749 else:
749 else:
750 md.update({msg_type : content.get('data', '')})
750 md.update({msg_type : content.get('data', '')})
751
751
752 # reduntant?
752 # reduntant?
753 self.metadata[msg_id] = md
753 self.metadata[msg_id] = md
754
754
755 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
755 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
756
756
757 #--------------------------------------------------------------------------
757 #--------------------------------------------------------------------------
758 # len, getitem
758 # len, getitem
759 #--------------------------------------------------------------------------
759 #--------------------------------------------------------------------------
760
760
761 def __len__(self):
761 def __len__(self):
762 """len(client) returns # of engines."""
762 """len(client) returns # of engines."""
763 return len(self.ids)
763 return len(self.ids)
764
764
765 def __getitem__(self, key):
765 def __getitem__(self, key):
766 """index access returns DirectView multiplexer objects
766 """index access returns DirectView multiplexer objects
767
767
768 Must be int, slice, or list/tuple/xrange of ints"""
768 Must be int, slice, or list/tuple/xrange of ints"""
769 if not isinstance(key, (int, slice, tuple, list, xrange)):
769 if not isinstance(key, (int, slice, tuple, list, xrange)):
770 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
770 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
771 else:
771 else:
772 return self.direct_view(key)
772 return self.direct_view(key)
773
773
774 #--------------------------------------------------------------------------
774 #--------------------------------------------------------------------------
775 # Begin public methods
775 # Begin public methods
776 #--------------------------------------------------------------------------
776 #--------------------------------------------------------------------------
777
777
778 @property
778 @property
779 def ids(self):
779 def ids(self):
780 """Always up-to-date ids property."""
780 """Always up-to-date ids property."""
781 self._flush_notifications()
781 self._flush_notifications()
782 # always copy:
782 # always copy:
783 return list(self._ids)
783 return list(self._ids)
784
784
785 def close(self):
785 def close(self):
786 if self._closed:
786 if self._closed:
787 return
787 return
788 snames = filter(lambda n: n.endswith('socket'), dir(self))
788 snames = filter(lambda n: n.endswith('socket'), dir(self))
789 for socket in map(lambda name: getattr(self, name), snames):
789 for socket in map(lambda name: getattr(self, name), snames):
790 if isinstance(socket, zmq.Socket) and not socket.closed:
790 if isinstance(socket, zmq.Socket) and not socket.closed:
791 socket.close()
791 socket.close()
792 self._closed = True
792 self._closed = True
793
793
794 def spin(self):
794 def spin(self):
795 """Flush any registration notifications and execution results
795 """Flush any registration notifications and execution results
796 waiting in the ZMQ queue.
796 waiting in the ZMQ queue.
797 """
797 """
798 if self._notification_socket:
798 if self._notification_socket:
799 self._flush_notifications()
799 self._flush_notifications()
800 if self._mux_socket:
800 if self._mux_socket:
801 self._flush_results(self._mux_socket)
801 self._flush_results(self._mux_socket)
802 if self._task_socket:
802 if self._task_socket:
803 self._flush_results(self._task_socket)
803 self._flush_results(self._task_socket)
804 if self._control_socket:
804 if self._control_socket:
805 self._flush_control(self._control_socket)
805 self._flush_control(self._control_socket)
806 if self._iopub_socket:
806 if self._iopub_socket:
807 self._flush_iopub(self._iopub_socket)
807 self._flush_iopub(self._iopub_socket)
808 if self._query_socket:
808 if self._query_socket:
809 self._flush_ignored_hub_replies()
809 self._flush_ignored_hub_replies()
810
810
811 def wait(self, jobs=None, timeout=-1):
811 def wait(self, jobs=None, timeout=-1):
812 """waits on one or more `jobs`, for up to `timeout` seconds.
812 """waits on one or more `jobs`, for up to `timeout` seconds.
813
813
814 Parameters
814 Parameters
815 ----------
815 ----------
816
816
817 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
817 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
818 ints are indices to self.history
818 ints are indices to self.history
819 strs are msg_ids
819 strs are msg_ids
820 default: wait on all outstanding messages
820 default: wait on all outstanding messages
821 timeout : float
821 timeout : float
822 a time in seconds, after which to give up.
822 a time in seconds, after which to give up.
823 default is -1, which means no timeout
823 default is -1, which means no timeout
824
824
825 Returns
825 Returns
826 -------
826 -------
827
827
828 True : when all msg_ids are done
828 True : when all msg_ids are done
829 False : timeout reached, some msg_ids still outstanding
829 False : timeout reached, some msg_ids still outstanding
830 """
830 """
831 tic = time.time()
831 tic = time.time()
832 if jobs is None:
832 if jobs is None:
833 theids = self.outstanding
833 theids = self.outstanding
834 else:
834 else:
835 if isinstance(jobs, (int, basestring, AsyncResult)):
835 if isinstance(jobs, (int, basestring, AsyncResult)):
836 jobs = [jobs]
836 jobs = [jobs]
837 theids = set()
837 theids = set()
838 for job in jobs:
838 for job in jobs:
839 if isinstance(job, int):
839 if isinstance(job, int):
840 # index access
840 # index access
841 job = self.history[job]
841 job = self.history[job]
842 elif isinstance(job, AsyncResult):
842 elif isinstance(job, AsyncResult):
843 map(theids.add, job.msg_ids)
843 map(theids.add, job.msg_ids)
844 continue
844 continue
845 theids.add(job)
845 theids.add(job)
846 if not theids.intersection(self.outstanding):
846 if not theids.intersection(self.outstanding):
847 return True
847 return True
848 self.spin()
848 self.spin()
849 while theids.intersection(self.outstanding):
849 while theids.intersection(self.outstanding):
850 if timeout >= 0 and ( time.time()-tic ) > timeout:
850 if timeout >= 0 and ( time.time()-tic ) > timeout:
851 break
851 break
852 time.sleep(1e-3)
852 time.sleep(1e-3)
853 self.spin()
853 self.spin()
854 return len(theids.intersection(self.outstanding)) == 0
854 return len(theids.intersection(self.outstanding)) == 0
855
855
856 #--------------------------------------------------------------------------
856 #--------------------------------------------------------------------------
857 # Control methods
857 # Control methods
858 #--------------------------------------------------------------------------
858 #--------------------------------------------------------------------------
859
859
860 @spin_first
860 @spin_first
861 def clear(self, targets=None, block=None):
861 def clear(self, targets=None, block=None):
862 """Clear the namespace in target(s)."""
862 """Clear the namespace in target(s)."""
863 block = self.block if block is None else block
863 block = self.block if block is None else block
864 targets = self._build_targets(targets)[0]
864 targets = self._build_targets(targets)[0]
865 for t in targets:
865 for t in targets:
866 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
866 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
867 error = False
867 error = False
868 if block:
868 if block:
869 self._flush_ignored_control()
869 self._flush_ignored_control()
870 for i in range(len(targets)):
870 for i in range(len(targets)):
871 idents,msg = self.session.recv(self._control_socket,0)
871 idents,msg = self.session.recv(self._control_socket,0)
872 if self.debug:
872 if self.debug:
873 pprint(msg)
873 pprint(msg)
874 if msg['content']['status'] != 'ok':
874 if msg['content']['status'] != 'ok':
875 error = self._unwrap_exception(msg['content'])
875 error = self._unwrap_exception(msg['content'])
876 else:
876 else:
877 self._ignored_control_replies += len(targets)
877 self._ignored_control_replies += len(targets)
878 if error:
878 if error:
879 raise error
879 raise error
880
880
881
881
882 @spin_first
882 @spin_first
883 def abort(self, jobs=None, targets=None, block=None):
883 def abort(self, jobs=None, targets=None, block=None):
884 """Abort specific jobs from the execution queues of target(s).
884 """Abort specific jobs from the execution queues of target(s).
885
885
886 This is a mechanism to prevent jobs that have already been submitted
886 This is a mechanism to prevent jobs that have already been submitted
887 from executing.
887 from executing.
888
888
889 Parameters
889 Parameters
890 ----------
890 ----------
891
891
892 jobs : msg_id, list of msg_ids, or AsyncResult
892 jobs : msg_id, list of msg_ids, or AsyncResult
893 The jobs to be aborted
893 The jobs to be aborted
894
894
895 If unspecified/None: abort all outstanding jobs.
895 If unspecified/None: abort all outstanding jobs.
896
896
897 """
897 """
898 block = self.block if block is None else block
898 block = self.block if block is None else block
899 jobs = jobs if jobs is not None else list(self.outstanding)
899 jobs = jobs if jobs is not None else list(self.outstanding)
900 targets = self._build_targets(targets)[0]
900 targets = self._build_targets(targets)[0]
901
901
902 msg_ids = []
902 msg_ids = []
903 if isinstance(jobs, (basestring,AsyncResult)):
903 if isinstance(jobs, (basestring,AsyncResult)):
904 jobs = [jobs]
904 jobs = [jobs]
905 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
905 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
906 if bad_ids:
906 if bad_ids:
907 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
907 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
908 for j in jobs:
908 for j in jobs:
909 if isinstance(j, AsyncResult):
909 if isinstance(j, AsyncResult):
910 msg_ids.extend(j.msg_ids)
910 msg_ids.extend(j.msg_ids)
911 else:
911 else:
912 msg_ids.append(j)
912 msg_ids.append(j)
913 content = dict(msg_ids=msg_ids)
913 content = dict(msg_ids=msg_ids)
914 for t in targets:
914 for t in targets:
915 self.session.send(self._control_socket, 'abort_request',
915 self.session.send(self._control_socket, 'abort_request',
916 content=content, ident=t)
916 content=content, ident=t)
917 error = False
917 error = False
918 if block:
918 if block:
919 self._flush_ignored_control()
919 self._flush_ignored_control()
920 for i in range(len(targets)):
920 for i in range(len(targets)):
921 idents,msg = self.session.recv(self._control_socket,0)
921 idents,msg = self.session.recv(self._control_socket,0)
922 if self.debug:
922 if self.debug:
923 pprint(msg)
923 pprint(msg)
924 if msg['content']['status'] != 'ok':
924 if msg['content']['status'] != 'ok':
925 error = self._unwrap_exception(msg['content'])
925 error = self._unwrap_exception(msg['content'])
926 else:
926 else:
927 self._ignored_control_replies += len(targets)
927 self._ignored_control_replies += len(targets)
928 if error:
928 if error:
929 raise error
929 raise error
930
930
931 @spin_first
931 @spin_first
932 def shutdown(self, targets=None, restart=False, hub=False, block=None):
932 def shutdown(self, targets=None, restart=False, hub=False, block=None):
933 """Terminates one or more engine processes, optionally including the hub."""
933 """Terminates one or more engine processes, optionally including the hub."""
934 block = self.block if block is None else block
934 block = self.block if block is None else block
935 if hub:
935 if hub:
936 targets = 'all'
936 targets = 'all'
937 targets = self._build_targets(targets)[0]
937 targets = self._build_targets(targets)[0]
938 for t in targets:
938 for t in targets:
939 self.session.send(self._control_socket, 'shutdown_request',
939 self.session.send(self._control_socket, 'shutdown_request',
940 content={'restart':restart},ident=t)
940 content={'restart':restart},ident=t)
941 error = False
941 error = False
942 if block or hub:
942 if block or hub:
943 self._flush_ignored_control()
943 self._flush_ignored_control()
944 for i in range(len(targets)):
944 for i in range(len(targets)):
945 idents,msg = self.session.recv(self._control_socket, 0)
945 idents,msg = self.session.recv(self._control_socket, 0)
946 if self.debug:
946 if self.debug:
947 pprint(msg)
947 pprint(msg)
948 if msg['content']['status'] != 'ok':
948 if msg['content']['status'] != 'ok':
949 error = self._unwrap_exception(msg['content'])
949 error = self._unwrap_exception(msg['content'])
950 else:
950 else:
951 self._ignored_control_replies += len(targets)
951 self._ignored_control_replies += len(targets)
952
952
953 if hub:
953 if hub:
954 time.sleep(0.25)
954 time.sleep(0.25)
955 self.session.send(self._query_socket, 'shutdown_request')
955 self.session.send(self._query_socket, 'shutdown_request')
956 idents,msg = self.session.recv(self._query_socket, 0)
956 idents,msg = self.session.recv(self._query_socket, 0)
957 if self.debug:
957 if self.debug:
958 pprint(msg)
958 pprint(msg)
959 if msg['content']['status'] != 'ok':
959 if msg['content']['status'] != 'ok':
960 error = self._unwrap_exception(msg['content'])
960 error = self._unwrap_exception(msg['content'])
961
961
962 if error:
962 if error:
963 raise error
963 raise error
964
964
965 #--------------------------------------------------------------------------
965 #--------------------------------------------------------------------------
966 # Execution related methods
966 # Execution related methods
967 #--------------------------------------------------------------------------
967 #--------------------------------------------------------------------------
968
968
969 def _maybe_raise(self, result):
969 def _maybe_raise(self, result):
970 """wrapper for maybe raising an exception if apply failed."""
970 """wrapper for maybe raising an exception if apply failed."""
971 if isinstance(result, error.RemoteError):
971 if isinstance(result, error.RemoteError):
972 raise result
972 raise result
973
973
974 return result
974 return result
975
975
976 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
976 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
977 ident=None):
977 ident=None):
978 """construct and send an apply message via a socket.
978 """construct and send an apply message via a socket.
979
979
980 This is the principal method with which all engine execution is performed by views.
980 This is the principal method with which all engine execution is performed by views.
981 """
981 """
982
982
983 assert not self._closed, "cannot use me anymore, I'm closed!"
983 assert not self._closed, "cannot use me anymore, I'm closed!"
984 # defaults:
984 # defaults:
985 args = args if args is not None else []
985 args = args if args is not None else []
986 kwargs = kwargs if kwargs is not None else {}
986 kwargs = kwargs if kwargs is not None else {}
987 subheader = subheader if subheader is not None else {}
987 subheader = subheader if subheader is not None else {}
988
988
989 # validate arguments
989 # validate arguments
990 if not callable(f) and not isinstance(f, Reference):
990 if not callable(f) and not isinstance(f, Reference):
991 raise TypeError("f must be callable, not %s"%type(f))
991 raise TypeError("f must be callable, not %s"%type(f))
992 if not isinstance(args, (tuple, list)):
992 if not isinstance(args, (tuple, list)):
993 raise TypeError("args must be tuple or list, not %s"%type(args))
993 raise TypeError("args must be tuple or list, not %s"%type(args))
994 if not isinstance(kwargs, dict):
994 if not isinstance(kwargs, dict):
995 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
995 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
996 if not isinstance(subheader, dict):
996 if not isinstance(subheader, dict):
997 raise TypeError("subheader must be dict, not %s"%type(subheader))
997 raise TypeError("subheader must be dict, not %s"%type(subheader))
998
998
999 bufs = util.pack_apply_message(f,args,kwargs)
999 bufs = util.pack_apply_message(f,args,kwargs)
1000
1000
1001 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1001 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1002 subheader=subheader, track=track)
1002 subheader=subheader, track=track)
1003
1003
1004 msg_id = msg['header']['msg_id']
1004 msg_id = msg['header']['msg_id']
1005 self.outstanding.add(msg_id)
1005 self.outstanding.add(msg_id)
1006 if ident:
1006 if ident:
1007 # possibly routed to a specific engine
1007 # possibly routed to a specific engine
1008 if isinstance(ident, list):
1008 if isinstance(ident, list):
1009 ident = ident[-1]
1009 ident = ident[-1]
1010 if ident in self._engines.values():
1010 if ident in self._engines.values():
1011 # save for later, in case of engine death
1011 # save for later, in case of engine death
1012 self._outstanding_dict[ident].add(msg_id)
1012 self._outstanding_dict[ident].add(msg_id)
1013 self.history.append(msg_id)
1013 self.history.append(msg_id)
1014 self.metadata[msg_id]['submitted'] = datetime.now()
1014 self.metadata[msg_id]['submitted'] = datetime.now()
1015
1015
1016 return msg
1016 return msg
1017
1017
1018 #--------------------------------------------------------------------------
1018 #--------------------------------------------------------------------------
1019 # construct a View object
1019 # construct a View object
1020 #--------------------------------------------------------------------------
1020 #--------------------------------------------------------------------------
1021
1021
1022 def load_balanced_view(self, targets=None):
1022 def load_balanced_view(self, targets=None):
1023 """construct a DirectView object.
1023 """construct a DirectView object.
1024
1024
1025 If no arguments are specified, create a LoadBalancedView
1025 If no arguments are specified, create a LoadBalancedView
1026 using all engines.
1026 using all engines.
1027
1027
1028 Parameters
1028 Parameters
1029 ----------
1029 ----------
1030
1030
1031 targets: list,slice,int,etc. [default: use all engines]
1031 targets: list,slice,int,etc. [default: use all engines]
1032 The subset of engines across which to load-balance
1032 The subset of engines across which to load-balance
1033 """
1033 """
1034 if targets == 'all':
1034 if targets == 'all':
1035 targets = None
1035 targets = None
1036 if targets is not None:
1036 if targets is not None:
1037 targets = self._build_targets(targets)[1]
1037 targets = self._build_targets(targets)[1]
1038 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1038 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1039
1039
1040 def direct_view(self, targets='all'):
1040 def direct_view(self, targets='all'):
1041 """construct a DirectView object.
1041 """construct a DirectView object.
1042
1042
1043 If no targets are specified, create a DirectView using all engines.
1043 If no targets are specified, create a DirectView using all engines.
1044
1044
1045 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1045 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1046 evaluate the target engines at each execution, whereas rc[:] will connect to
1046 evaluate the target engines at each execution, whereas rc[:] will connect to
1047 all *current* engines, and that list will not change.
1047 all *current* engines, and that list will not change.
1048
1048
1049 That is, 'all' will always use all engines, whereas rc[:] will not use
1049 That is, 'all' will always use all engines, whereas rc[:] will not use
1050 engines added after the DirectView is constructed.
1050 engines added after the DirectView is constructed.
1051
1051
1052 Parameters
1052 Parameters
1053 ----------
1053 ----------
1054
1054
1055 targets: list,slice,int,etc. [default: use all engines]
1055 targets: list,slice,int,etc. [default: use all engines]
1056 The engines to use for the View
1056 The engines to use for the View
1057 """
1057 """
1058 single = isinstance(targets, int)
1058 single = isinstance(targets, int)
1059 # allow 'all' to be lazily evaluated at each execution
1059 # allow 'all' to be lazily evaluated at each execution
1060 if targets != 'all':
1060 if targets != 'all':
1061 targets = self._build_targets(targets)[1]
1061 targets = self._build_targets(targets)[1]
1062 if single:
1062 if single:
1063 targets = targets[0]
1063 targets = targets[0]
1064 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1064 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1065
1065
1066 #--------------------------------------------------------------------------
1066 #--------------------------------------------------------------------------
1067 # Query methods
1067 # Query methods
1068 #--------------------------------------------------------------------------
1068 #--------------------------------------------------------------------------
1069
1069
1070 @spin_first
1070 @spin_first
1071 def get_result(self, indices_or_msg_ids=None, block=None):
1071 def get_result(self, indices_or_msg_ids=None, block=None):
1072 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1072 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1073
1073
1074 If the client already has the results, no request to the Hub will be made.
1074 If the client already has the results, no request to the Hub will be made.
1075
1075
1076 This is a convenient way to construct AsyncResult objects, which are wrappers
1076 This is a convenient way to construct AsyncResult objects, which are wrappers
1077 that include metadata about execution, and allow for awaiting results that
1077 that include metadata about execution, and allow for awaiting results that
1078 were not submitted by this Client.
1078 were not submitted by this Client.
1079
1079
1080 It can also be a convenient way to retrieve the metadata associated with
1080 It can also be a convenient way to retrieve the metadata associated with
1081 blocking execution, since it always retrieves
1081 blocking execution, since it always retrieves
1082
1082
1083 Examples
1083 Examples
1084 --------
1084 --------
1085 ::
1085 ::
1086
1086
1087 In [10]: r = client.apply()
1087 In [10]: r = client.apply()
1088
1088
1089 Parameters
1089 Parameters
1090 ----------
1090 ----------
1091
1091
1092 indices_or_msg_ids : integer history index, str msg_id, or list of either
1092 indices_or_msg_ids : integer history index, str msg_id, or list of either
1093 The indices or msg_ids of indices to be retrieved
1093 The indices or msg_ids of indices to be retrieved
1094
1094
1095 block : bool
1095 block : bool
1096 Whether to wait for the result to be done
1096 Whether to wait for the result to be done
1097
1097
1098 Returns
1098 Returns
1099 -------
1099 -------
1100
1100
1101 AsyncResult
1101 AsyncResult
1102 A single AsyncResult object will always be returned.
1102 A single AsyncResult object will always be returned.
1103
1103
1104 AsyncHubResult
1104 AsyncHubResult
1105 A subclass of AsyncResult that retrieves results from the Hub
1105 A subclass of AsyncResult that retrieves results from the Hub
1106
1106
1107 """
1107 """
1108 block = self.block if block is None else block
1108 block = self.block if block is None else block
1109 if indices_or_msg_ids is None:
1109 if indices_or_msg_ids is None:
1110 indices_or_msg_ids = -1
1110 indices_or_msg_ids = -1
1111
1111
1112 if not isinstance(indices_or_msg_ids, (list,tuple)):
1112 if not isinstance(indices_or_msg_ids, (list,tuple)):
1113 indices_or_msg_ids = [indices_or_msg_ids]
1113 indices_or_msg_ids = [indices_or_msg_ids]
1114
1114
1115 theids = []
1115 theids = []
1116 for id in indices_or_msg_ids:
1116 for id in indices_or_msg_ids:
1117 if isinstance(id, int):
1117 if isinstance(id, int):
1118 id = self.history[id]
1118 id = self.history[id]
1119 if not isinstance(id, basestring):
1119 if not isinstance(id, basestring):
1120 raise TypeError("indices must be str or int, not %r"%id)
1120 raise TypeError("indices must be str or int, not %r"%id)
1121 theids.append(id)
1121 theids.append(id)
1122
1122
1123 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1123 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1124 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1124 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1125
1125
1126 if remote_ids:
1126 if remote_ids:
1127 ar = AsyncHubResult(self, msg_ids=theids)
1127 ar = AsyncHubResult(self, msg_ids=theids)
1128 else:
1128 else:
1129 ar = AsyncResult(self, msg_ids=theids)
1129 ar = AsyncResult(self, msg_ids=theids)
1130
1130
1131 if block:
1131 if block:
1132 ar.wait()
1132 ar.wait()
1133
1133
1134 return ar
1134 return ar
1135
1135
1136 @spin_first
1136 @spin_first
1137 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1137 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1138 """Resubmit one or more tasks.
1138 """Resubmit one or more tasks.
1139
1139
1140 in-flight tasks may not be resubmitted.
1140 in-flight tasks may not be resubmitted.
1141
1141
1142 Parameters
1142 Parameters
1143 ----------
1143 ----------
1144
1144
1145 indices_or_msg_ids : integer history index, str msg_id, or list of either
1145 indices_or_msg_ids : integer history index, str msg_id, or list of either
1146 The indices or msg_ids of indices to be retrieved
1146 The indices or msg_ids of indices to be retrieved
1147
1147
1148 block : bool
1148 block : bool
1149 Whether to wait for the result to be done
1149 Whether to wait for the result to be done
1150
1150
1151 Returns
1151 Returns
1152 -------
1152 -------
1153
1153
1154 AsyncHubResult
1154 AsyncHubResult
1155 A subclass of AsyncResult that retrieves results from the Hub
1155 A subclass of AsyncResult that retrieves results from the Hub
1156
1156
1157 """
1157 """
1158 block = self.block if block is None else block
1158 block = self.block if block is None else block
1159 if indices_or_msg_ids is None:
1159 if indices_or_msg_ids is None:
1160 indices_or_msg_ids = -1
1160 indices_or_msg_ids = -1
1161
1161
1162 if not isinstance(indices_or_msg_ids, (list,tuple)):
1162 if not isinstance(indices_or_msg_ids, (list,tuple)):
1163 indices_or_msg_ids = [indices_or_msg_ids]
1163 indices_or_msg_ids = [indices_or_msg_ids]
1164
1164
1165 theids = []
1165 theids = []
1166 for id in indices_or_msg_ids:
1166 for id in indices_or_msg_ids:
1167 if isinstance(id, int):
1167 if isinstance(id, int):
1168 id = self.history[id]
1168 id = self.history[id]
1169 if not isinstance(id, basestring):
1169 if not isinstance(id, basestring):
1170 raise TypeError("indices must be str or int, not %r"%id)
1170 raise TypeError("indices must be str or int, not %r"%id)
1171 theids.append(id)
1171 theids.append(id)
1172
1172
1173 for msg_id in theids:
1173 for msg_id in theids:
1174 self.outstanding.discard(msg_id)
1174 self.outstanding.discard(msg_id)
1175 if msg_id in self.history:
1175 if msg_id in self.history:
1176 self.history.remove(msg_id)
1176 self.history.remove(msg_id)
1177 self.results.pop(msg_id, None)
1177 self.results.pop(msg_id, None)
1178 self.metadata.pop(msg_id, None)
1178 self.metadata.pop(msg_id, None)
1179 content = dict(msg_ids = theids)
1179 content = dict(msg_ids = theids)
1180
1180
1181 self.session.send(self._query_socket, 'resubmit_request', content)
1181 self.session.send(self._query_socket, 'resubmit_request', content)
1182
1182
1183 zmq.select([self._query_socket], [], [])
1183 zmq.select([self._query_socket], [], [])
1184 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1184 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1185 if self.debug:
1185 if self.debug:
1186 pprint(msg)
1186 pprint(msg)
1187 content = msg['content']
1187 content = msg['content']
1188 if content['status'] != 'ok':
1188 if content['status'] != 'ok':
1189 raise self._unwrap_exception(content)
1189 raise self._unwrap_exception(content)
1190
1190
1191 ar = AsyncHubResult(self, msg_ids=theids)
1191 ar = AsyncHubResult(self, msg_ids=theids)
1192
1192
1193 if block:
1193 if block:
1194 ar.wait()
1194 ar.wait()
1195
1195
1196 return ar
1196 return ar
1197
1197
1198 @spin_first
1198 @spin_first
1199 def result_status(self, msg_ids, status_only=True):
1199 def result_status(self, msg_ids, status_only=True):
1200 """Check on the status of the result(s) of the apply request with `msg_ids`.
1200 """Check on the status of the result(s) of the apply request with `msg_ids`.
1201
1201
1202 If status_only is False, then the actual results will be retrieved, else
1202 If status_only is False, then the actual results will be retrieved, else
1203 only the status of the results will be checked.
1203 only the status of the results will be checked.
1204
1204
1205 Parameters
1205 Parameters
1206 ----------
1206 ----------
1207
1207
1208 msg_ids : list of msg_ids
1208 msg_ids : list of msg_ids
1209 if int:
1209 if int:
1210 Passed as index to self.history for convenience.
1210 Passed as index to self.history for convenience.
1211 status_only : bool (default: True)
1211 status_only : bool (default: True)
1212 if False:
1212 if False:
1213 Retrieve the actual results of completed tasks.
1213 Retrieve the actual results of completed tasks.
1214
1214
1215 Returns
1215 Returns
1216 -------
1216 -------
1217
1217
1218 results : dict
1218 results : dict
1219 There will always be the keys 'pending' and 'completed', which will
1219 There will always be the keys 'pending' and 'completed', which will
1220 be lists of msg_ids that are incomplete or complete. If `status_only`
1220 be lists of msg_ids that are incomplete or complete. If `status_only`
1221 is False, then completed results will be keyed by their `msg_id`.
1221 is False, then completed results will be keyed by their `msg_id`.
1222 """
1222 """
1223 if not isinstance(msg_ids, (list,tuple)):
1223 if not isinstance(msg_ids, (list,tuple)):
1224 msg_ids = [msg_ids]
1224 msg_ids = [msg_ids]
1225
1225
1226 theids = []
1226 theids = []
1227 for msg_id in msg_ids:
1227 for msg_id in msg_ids:
1228 if isinstance(msg_id, int):
1228 if isinstance(msg_id, int):
1229 msg_id = self.history[msg_id]
1229 msg_id = self.history[msg_id]
1230 if not isinstance(msg_id, basestring):
1230 if not isinstance(msg_id, basestring):
1231 raise TypeError("msg_ids must be str, not %r"%msg_id)
1231 raise TypeError("msg_ids must be str, not %r"%msg_id)
1232 theids.append(msg_id)
1232 theids.append(msg_id)
1233
1233
1234 completed = []
1234 completed = []
1235 local_results = {}
1235 local_results = {}
1236
1236
1237 # comment this block out to temporarily disable local shortcut:
1237 # comment this block out to temporarily disable local shortcut:
1238 for msg_id in theids:
1238 for msg_id in theids:
1239 if msg_id in self.results:
1239 if msg_id in self.results:
1240 completed.append(msg_id)
1240 completed.append(msg_id)
1241 local_results[msg_id] = self.results[msg_id]
1241 local_results[msg_id] = self.results[msg_id]
1242 theids.remove(msg_id)
1242 theids.remove(msg_id)
1243
1243
1244 if theids: # some not locally cached
1244 if theids: # some not locally cached
1245 content = dict(msg_ids=theids, status_only=status_only)
1245 content = dict(msg_ids=theids, status_only=status_only)
1246 msg = self.session.send(self._query_socket, "result_request", content=content)
1246 msg = self.session.send(self._query_socket, "result_request", content=content)
1247 zmq.select([self._query_socket], [], [])
1247 zmq.select([self._query_socket], [], [])
1248 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1248 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1249 if self.debug:
1249 if self.debug:
1250 pprint(msg)
1250 pprint(msg)
1251 content = msg['content']
1251 content = msg['content']
1252 if content['status'] != 'ok':
1252 if content['status'] != 'ok':
1253 raise self._unwrap_exception(content)
1253 raise self._unwrap_exception(content)
1254 buffers = msg['buffers']
1254 buffers = msg['buffers']
1255 else:
1255 else:
1256 content = dict(completed=[],pending=[])
1256 content = dict(completed=[],pending=[])
1257
1257
1258 content['completed'].extend(completed)
1258 content['completed'].extend(completed)
1259
1259
1260 if status_only:
1260 if status_only:
1261 return content
1261 return content
1262
1262
1263 failures = []
1263 failures = []
1264 # load cached results into result:
1264 # load cached results into result:
1265 content.update(local_results)
1265 content.update(local_results)
1266
1266
1267 # update cache with results:
1267 # update cache with results:
1268 for msg_id in sorted(theids):
1268 for msg_id in sorted(theids):
1269 if msg_id in content['completed']:
1269 if msg_id in content['completed']:
1270 rec = content[msg_id]
1270 rec = content[msg_id]
1271 parent = rec['header']
1271 parent = rec['header']
1272 header = rec['result_header']
1272 header = rec['result_header']
1273 rcontent = rec['result_content']
1273 rcontent = rec['result_content']
1274 iodict = rec['io']
1274 iodict = rec['io']
1275 if isinstance(rcontent, str):
1275 if isinstance(rcontent, str):
1276 rcontent = self.session.unpack(rcontent)
1276 rcontent = self.session.unpack(rcontent)
1277
1277
1278 md = self.metadata[msg_id]
1278 md = self.metadata[msg_id]
1279 md.update(self._extract_metadata(header, parent, rcontent))
1279 md.update(self._extract_metadata(header, parent, rcontent))
1280 md.update(iodict)
1280 md.update(iodict)
1281
1281
1282 if rcontent['status'] == 'ok':
1282 if rcontent['status'] == 'ok':
1283 res,buffers = util.unserialize_object(buffers)
1283 res,buffers = util.unserialize_object(buffers)
1284 else:
1284 else:
1285 print rcontent
1285 print rcontent
1286 res = self._unwrap_exception(rcontent)
1286 res = self._unwrap_exception(rcontent)
1287 failures.append(res)
1287 failures.append(res)
1288
1288
1289 self.results[msg_id] = res
1289 self.results[msg_id] = res
1290 content[msg_id] = res
1290 content[msg_id] = res
1291
1291
1292 if len(theids) == 1 and failures:
1292 if len(theids) == 1 and failures:
1293 raise failures[0]
1293 raise failures[0]
1294
1294
1295 error.collect_exceptions(failures, "result_status")
1295 error.collect_exceptions(failures, "result_status")
1296 return content
1296 return content
1297
1297
1298 @spin_first
1298 @spin_first
1299 def queue_status(self, targets='all', verbose=False):
1299 def queue_status(self, targets='all', verbose=False):
1300 """Fetch the status of engine queues.
1300 """Fetch the status of engine queues.
1301
1301
1302 Parameters
1302 Parameters
1303 ----------
1303 ----------
1304
1304
1305 targets : int/str/list of ints/strs
1305 targets : int/str/list of ints/strs
1306 the engines whose states are to be queried.
1306 the engines whose states are to be queried.
1307 default : all
1307 default : all
1308 verbose : bool
1308 verbose : bool
1309 Whether to return lengths only, or lists of ids for each element
1309 Whether to return lengths only, or lists of ids for each element
1310 """
1310 """
1311 engine_ids = self._build_targets(targets)[1]
1311 if targets == 'all':
1312 # allow 'all' to be evaluated on the engine
1313 engine_ids = None
1314 else:
1315 engine_ids = self._build_targets(targets)[1]
1312 content = dict(targets=engine_ids, verbose=verbose)
1316 content = dict(targets=engine_ids, verbose=verbose)
1313 self.session.send(self._query_socket, "queue_request", content=content)
1317 self.session.send(self._query_socket, "queue_request", content=content)
1314 idents,msg = self.session.recv(self._query_socket, 0)
1318 idents,msg = self.session.recv(self._query_socket, 0)
1315 if self.debug:
1319 if self.debug:
1316 pprint(msg)
1320 pprint(msg)
1317 content = msg['content']
1321 content = msg['content']
1318 status = content.pop('status')
1322 status = content.pop('status')
1319 if status != 'ok':
1323 if status != 'ok':
1320 raise self._unwrap_exception(content)
1324 raise self._unwrap_exception(content)
1321 content = rekey(content)
1325 content = rekey(content)
1322 if isinstance(targets, int):
1326 if isinstance(targets, int):
1323 return content[targets]
1327 return content[targets]
1324 else:
1328 else:
1325 return content
1329 return content
1326
1330
1327 @spin_first
1331 @spin_first
1328 def purge_results(self, jobs=[], targets=[]):
1332 def purge_results(self, jobs=[], targets=[]):
1329 """Tell the Hub to forget results.
1333 """Tell the Hub to forget results.
1330
1334
1331 Individual results can be purged by msg_id, or the entire
1335 Individual results can be purged by msg_id, or the entire
1332 history of specific targets can be purged.
1336 history of specific targets can be purged.
1333
1337
1334 Use `purge_results('all')` to scrub everything from the Hub's db.
1338 Use `purge_results('all')` to scrub everything from the Hub's db.
1335
1339
1336 Parameters
1340 Parameters
1337 ----------
1341 ----------
1338
1342
1339 jobs : str or list of str or AsyncResult objects
1343 jobs : str or list of str or AsyncResult objects
1340 the msg_ids whose results should be forgotten.
1344 the msg_ids whose results should be forgotten.
1341 targets : int/str/list of ints/strs
1345 targets : int/str/list of ints/strs
1342 The targets, by int_id, whose entire history is to be purged.
1346 The targets, by int_id, whose entire history is to be purged.
1343
1347
1344 default : None
1348 default : None
1345 """
1349 """
1346 if not targets and not jobs:
1350 if not targets and not jobs:
1347 raise ValueError("Must specify at least one of `targets` and `jobs`")
1351 raise ValueError("Must specify at least one of `targets` and `jobs`")
1348 if targets:
1352 if targets:
1349 targets = self._build_targets(targets)[1]
1353 targets = self._build_targets(targets)[1]
1350
1354
1351 # construct msg_ids from jobs
1355 # construct msg_ids from jobs
1352 if jobs == 'all':
1356 if jobs == 'all':
1353 msg_ids = jobs
1357 msg_ids = jobs
1354 else:
1358 else:
1355 msg_ids = []
1359 msg_ids = []
1356 if isinstance(jobs, (basestring,AsyncResult)):
1360 if isinstance(jobs, (basestring,AsyncResult)):
1357 jobs = [jobs]
1361 jobs = [jobs]
1358 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1362 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1359 if bad_ids:
1363 if bad_ids:
1360 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1364 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1361 for j in jobs:
1365 for j in jobs:
1362 if isinstance(j, AsyncResult):
1366 if isinstance(j, AsyncResult):
1363 msg_ids.extend(j.msg_ids)
1367 msg_ids.extend(j.msg_ids)
1364 else:
1368 else:
1365 msg_ids.append(j)
1369 msg_ids.append(j)
1366
1370
1367 content = dict(engine_ids=targets, msg_ids=msg_ids)
1371 content = dict(engine_ids=targets, msg_ids=msg_ids)
1368 self.session.send(self._query_socket, "purge_request", content=content)
1372 self.session.send(self._query_socket, "purge_request", content=content)
1369 idents, msg = self.session.recv(self._query_socket, 0)
1373 idents, msg = self.session.recv(self._query_socket, 0)
1370 if self.debug:
1374 if self.debug:
1371 pprint(msg)
1375 pprint(msg)
1372 content = msg['content']
1376 content = msg['content']
1373 if content['status'] != 'ok':
1377 if content['status'] != 'ok':
1374 raise self._unwrap_exception(content)
1378 raise self._unwrap_exception(content)
1375
1379
1376 @spin_first
1380 @spin_first
1377 def hub_history(self):
1381 def hub_history(self):
1378 """Get the Hub's history
1382 """Get the Hub's history
1379
1383
1380 Just like the Client, the Hub has a history, which is a list of msg_ids.
1384 Just like the Client, the Hub has a history, which is a list of msg_ids.
1381 This will contain the history of all clients, and, depending on configuration,
1385 This will contain the history of all clients, and, depending on configuration,
1382 may contain history across multiple cluster sessions.
1386 may contain history across multiple cluster sessions.
1383
1387
1384 Any msg_id returned here is a valid argument to `get_result`.
1388 Any msg_id returned here is a valid argument to `get_result`.
1385
1389
1386 Returns
1390 Returns
1387 -------
1391 -------
1388
1392
1389 msg_ids : list of strs
1393 msg_ids : list of strs
1390 list of all msg_ids, ordered by task submission time.
1394 list of all msg_ids, ordered by task submission time.
1391 """
1395 """
1392
1396
1393 self.session.send(self._query_socket, "history_request", content={})
1397 self.session.send(self._query_socket, "history_request", content={})
1394 idents, msg = self.session.recv(self._query_socket, 0)
1398 idents, msg = self.session.recv(self._query_socket, 0)
1395
1399
1396 if self.debug:
1400 if self.debug:
1397 pprint(msg)
1401 pprint(msg)
1398 content = msg['content']
1402 content = msg['content']
1399 if content['status'] != 'ok':
1403 if content['status'] != 'ok':
1400 raise self._unwrap_exception(content)
1404 raise self._unwrap_exception(content)
1401 else:
1405 else:
1402 return content['history']
1406 return content['history']
1403
1407
1404 @spin_first
1408 @spin_first
1405 def db_query(self, query, keys=None):
1409 def db_query(self, query, keys=None):
1406 """Query the Hub's TaskRecord database
1410 """Query the Hub's TaskRecord database
1407
1411
1408 This will return a list of task record dicts that match `query`
1412 This will return a list of task record dicts that match `query`
1409
1413
1410 Parameters
1414 Parameters
1411 ----------
1415 ----------
1412
1416
1413 query : mongodb query dict
1417 query : mongodb query dict
1414 The search dict. See mongodb query docs for details.
1418 The search dict. See mongodb query docs for details.
1415 keys : list of strs [optional]
1419 keys : list of strs [optional]
1416 The subset of keys to be returned. The default is to fetch everything but buffers.
1420 The subset of keys to be returned. The default is to fetch everything but buffers.
1417 'msg_id' will *always* be included.
1421 'msg_id' will *always* be included.
1418 """
1422 """
1419 if isinstance(keys, basestring):
1423 if isinstance(keys, basestring):
1420 keys = [keys]
1424 keys = [keys]
1421 content = dict(query=query, keys=keys)
1425 content = dict(query=query, keys=keys)
1422 self.session.send(self._query_socket, "db_request", content=content)
1426 self.session.send(self._query_socket, "db_request", content=content)
1423 idents, msg = self.session.recv(self._query_socket, 0)
1427 idents, msg = self.session.recv(self._query_socket, 0)
1424 if self.debug:
1428 if self.debug:
1425 pprint(msg)
1429 pprint(msg)
1426 content = msg['content']
1430 content = msg['content']
1427 if content['status'] != 'ok':
1431 if content['status'] != 'ok':
1428 raise self._unwrap_exception(content)
1432 raise self._unwrap_exception(content)
1429
1433
1430 records = content['records']
1434 records = content['records']
1431
1435
1432 buffer_lens = content['buffer_lens']
1436 buffer_lens = content['buffer_lens']
1433 result_buffer_lens = content['result_buffer_lens']
1437 result_buffer_lens = content['result_buffer_lens']
1434 buffers = msg['buffers']
1438 buffers = msg['buffers']
1435 has_bufs = buffer_lens is not None
1439 has_bufs = buffer_lens is not None
1436 has_rbufs = result_buffer_lens is not None
1440 has_rbufs = result_buffer_lens is not None
1437 for i,rec in enumerate(records):
1441 for i,rec in enumerate(records):
1438 # relink buffers
1442 # relink buffers
1439 if has_bufs:
1443 if has_bufs:
1440 blen = buffer_lens[i]
1444 blen = buffer_lens[i]
1441 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1445 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1442 if has_rbufs:
1446 if has_rbufs:
1443 blen = result_buffer_lens[i]
1447 blen = result_buffer_lens[i]
1444 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1448 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1445
1449
1446 return records
1450 return records
1447
1451
1448 __all__ = [ 'Client' ]
1452 __all__ = [ 'Client' ]
@@ -1,1293 +1,1293 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import sys
21 import sys
22 import time
22 import time
23 from datetime import datetime
23 from datetime import datetime
24
24
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27 from zmq.eventloop.zmqstream import ZMQStream
27 from zmq.eventloop.zmqstream import ZMQStream
28
28
29 # internal:
29 # internal:
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31 from IPython.utils.traitlets import (
31 from IPython.utils.traitlets import (
32 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
32 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 )
33 )
34
34
35 from IPython.parallel import error, util
35 from IPython.parallel import error, util
36 from IPython.parallel.factory import RegistrationFactory
36 from IPython.parallel.factory import RegistrationFactory
37
37
38 from IPython.zmq.session import SessionFactory
38 from IPython.zmq.session import SessionFactory
39
39
40 from .heartmonitor import HeartMonitor
40 from .heartmonitor import HeartMonitor
41
41
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43 # Code
43 # Code
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45
45
46 def _passer(*args, **kwargs):
46 def _passer(*args, **kwargs):
47 return
47 return
48
48
49 def _printer(*args, **kwargs):
49 def _printer(*args, **kwargs):
50 print (args)
50 print (args)
51 print (kwargs)
51 print (kwargs)
52
52
53 def empty_record():
53 def empty_record():
54 """Return an empty dict with all record keys."""
54 """Return an empty dict with all record keys."""
55 return {
55 return {
56 'msg_id' : None,
56 'msg_id' : None,
57 'header' : None,
57 'header' : None,
58 'content': None,
58 'content': None,
59 'buffers': None,
59 'buffers': None,
60 'submitted': None,
60 'submitted': None,
61 'client_uuid' : None,
61 'client_uuid' : None,
62 'engine_uuid' : None,
62 'engine_uuid' : None,
63 'started': None,
63 'started': None,
64 'completed': None,
64 'completed': None,
65 'resubmitted': None,
65 'resubmitted': None,
66 'result_header' : None,
66 'result_header' : None,
67 'result_content' : None,
67 'result_content' : None,
68 'result_buffers' : None,
68 'result_buffers' : None,
69 'queue' : None,
69 'queue' : None,
70 'pyin' : None,
70 'pyin' : None,
71 'pyout': None,
71 'pyout': None,
72 'pyerr': None,
72 'pyerr': None,
73 'stdout': '',
73 'stdout': '',
74 'stderr': '',
74 'stderr': '',
75 }
75 }
76
76
77 def init_record(msg):
77 def init_record(msg):
78 """Initialize a TaskRecord based on a request."""
78 """Initialize a TaskRecord based on a request."""
79 header = msg['header']
79 header = msg['header']
80 return {
80 return {
81 'msg_id' : header['msg_id'],
81 'msg_id' : header['msg_id'],
82 'header' : header,
82 'header' : header,
83 'content': msg['content'],
83 'content': msg['content'],
84 'buffers': msg['buffers'],
84 'buffers': msg['buffers'],
85 'submitted': header['date'],
85 'submitted': header['date'],
86 'client_uuid' : None,
86 'client_uuid' : None,
87 'engine_uuid' : None,
87 'engine_uuid' : None,
88 'started': None,
88 'started': None,
89 'completed': None,
89 'completed': None,
90 'resubmitted': None,
90 'resubmitted': None,
91 'result_header' : None,
91 'result_header' : None,
92 'result_content' : None,
92 'result_content' : None,
93 'result_buffers' : None,
93 'result_buffers' : None,
94 'queue' : None,
94 'queue' : None,
95 'pyin' : None,
95 'pyin' : None,
96 'pyout': None,
96 'pyout': None,
97 'pyerr': None,
97 'pyerr': None,
98 'stdout': '',
98 'stdout': '',
99 'stderr': '',
99 'stderr': '',
100 }
100 }
101
101
102
102
103 class EngineConnector(HasTraits):
103 class EngineConnector(HasTraits):
104 """A simple object for accessing the various zmq connections of an object.
104 """A simple object for accessing the various zmq connections of an object.
105 Attributes are:
105 Attributes are:
106 id (int): engine ID
106 id (int): engine ID
107 uuid (str): uuid (unused?)
107 uuid (str): uuid (unused?)
108 queue (str): identity of queue's XREQ socket
108 queue (str): identity of queue's XREQ socket
109 registration (str): identity of registration XREQ socket
109 registration (str): identity of registration XREQ socket
110 heartbeat (str): identity of heartbeat XREQ socket
110 heartbeat (str): identity of heartbeat XREQ socket
111 """
111 """
112 id=Integer(0)
112 id=Integer(0)
113 queue=CBytes()
113 queue=CBytes()
114 control=CBytes()
114 control=CBytes()
115 registration=CBytes()
115 registration=CBytes()
116 heartbeat=CBytes()
116 heartbeat=CBytes()
117 pending=Set()
117 pending=Set()
118
118
119 class HubFactory(RegistrationFactory):
119 class HubFactory(RegistrationFactory):
120 """The Configurable for setting up a Hub."""
120 """The Configurable for setting up a Hub."""
121
121
122 # port-pairs for monitoredqueues:
122 # port-pairs for monitoredqueues:
123 hb = Tuple(Integer,Integer,config=True,
123 hb = Tuple(Integer,Integer,config=True,
124 help="""XREQ/SUB Port pair for Engine heartbeats""")
124 help="""XREQ/SUB Port pair for Engine heartbeats""")
125 def _hb_default(self):
125 def _hb_default(self):
126 return tuple(util.select_random_ports(2))
126 return tuple(util.select_random_ports(2))
127
127
128 mux = Tuple(Integer,Integer,config=True,
128 mux = Tuple(Integer,Integer,config=True,
129 help="""Engine/Client Port pair for MUX queue""")
129 help="""Engine/Client Port pair for MUX queue""")
130
130
131 def _mux_default(self):
131 def _mux_default(self):
132 return tuple(util.select_random_ports(2))
132 return tuple(util.select_random_ports(2))
133
133
134 task = Tuple(Integer,Integer,config=True,
134 task = Tuple(Integer,Integer,config=True,
135 help="""Engine/Client Port pair for Task queue""")
135 help="""Engine/Client Port pair for Task queue""")
136 def _task_default(self):
136 def _task_default(self):
137 return tuple(util.select_random_ports(2))
137 return tuple(util.select_random_ports(2))
138
138
139 control = Tuple(Integer,Integer,config=True,
139 control = Tuple(Integer,Integer,config=True,
140 help="""Engine/Client Port pair for Control queue""")
140 help="""Engine/Client Port pair for Control queue""")
141
141
142 def _control_default(self):
142 def _control_default(self):
143 return tuple(util.select_random_ports(2))
143 return tuple(util.select_random_ports(2))
144
144
145 iopub = Tuple(Integer,Integer,config=True,
145 iopub = Tuple(Integer,Integer,config=True,
146 help="""Engine/Client Port pair for IOPub relay""")
146 help="""Engine/Client Port pair for IOPub relay""")
147
147
148 def _iopub_default(self):
148 def _iopub_default(self):
149 return tuple(util.select_random_ports(2))
149 return tuple(util.select_random_ports(2))
150
150
151 # single ports:
151 # single ports:
152 mon_port = Integer(config=True,
152 mon_port = Integer(config=True,
153 help="""Monitor (SUB) port for queue traffic""")
153 help="""Monitor (SUB) port for queue traffic""")
154
154
155 def _mon_port_default(self):
155 def _mon_port_default(self):
156 return util.select_random_ports(1)[0]
156 return util.select_random_ports(1)[0]
157
157
158 notifier_port = Integer(config=True,
158 notifier_port = Integer(config=True,
159 help="""PUB port for sending engine status notifications""")
159 help="""PUB port for sending engine status notifications""")
160
160
161 def _notifier_port_default(self):
161 def _notifier_port_default(self):
162 return util.select_random_ports(1)[0]
162 return util.select_random_ports(1)[0]
163
163
164 engine_ip = Unicode('127.0.0.1', config=True,
164 engine_ip = Unicode('127.0.0.1', config=True,
165 help="IP on which to listen for engine connections. [default: loopback]")
165 help="IP on which to listen for engine connections. [default: loopback]")
166 engine_transport = Unicode('tcp', config=True,
166 engine_transport = Unicode('tcp', config=True,
167 help="0MQ transport for engine connections. [default: tcp]")
167 help="0MQ transport for engine connections. [default: tcp]")
168
168
169 client_ip = Unicode('127.0.0.1', config=True,
169 client_ip = Unicode('127.0.0.1', config=True,
170 help="IP on which to listen for client connections. [default: loopback]")
170 help="IP on which to listen for client connections. [default: loopback]")
171 client_transport = Unicode('tcp', config=True,
171 client_transport = Unicode('tcp', config=True,
172 help="0MQ transport for client connections. [default : tcp]")
172 help="0MQ transport for client connections. [default : tcp]")
173
173
174 monitor_ip = Unicode('127.0.0.1', config=True,
174 monitor_ip = Unicode('127.0.0.1', config=True,
175 help="IP on which to listen for monitor messages. [default: loopback]")
175 help="IP on which to listen for monitor messages. [default: loopback]")
176 monitor_transport = Unicode('tcp', config=True,
176 monitor_transport = Unicode('tcp', config=True,
177 help="0MQ transport for monitor messages. [default : tcp]")
177 help="0MQ transport for monitor messages. [default : tcp]")
178
178
179 monitor_url = Unicode('')
179 monitor_url = Unicode('')
180
180
181 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
181 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
182 config=True, help="""The class to use for the DB backend""")
182 config=True, help="""The class to use for the DB backend""")
183
183
184 # not configurable
184 # not configurable
185 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
185 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
186 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
186 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
187
187
188 def _ip_changed(self, name, old, new):
188 def _ip_changed(self, name, old, new):
189 self.engine_ip = new
189 self.engine_ip = new
190 self.client_ip = new
190 self.client_ip = new
191 self.monitor_ip = new
191 self.monitor_ip = new
192 self._update_monitor_url()
192 self._update_monitor_url()
193
193
194 def _update_monitor_url(self):
194 def _update_monitor_url(self):
195 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
195 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
196
196
197 def _transport_changed(self, name, old, new):
197 def _transport_changed(self, name, old, new):
198 self.engine_transport = new
198 self.engine_transport = new
199 self.client_transport = new
199 self.client_transport = new
200 self.monitor_transport = new
200 self.monitor_transport = new
201 self._update_monitor_url()
201 self._update_monitor_url()
202
202
203 def __init__(self, **kwargs):
203 def __init__(self, **kwargs):
204 super(HubFactory, self).__init__(**kwargs)
204 super(HubFactory, self).__init__(**kwargs)
205 self._update_monitor_url()
205 self._update_monitor_url()
206
206
207
207
208 def construct(self):
208 def construct(self):
209 self.init_hub()
209 self.init_hub()
210
210
211 def start(self):
211 def start(self):
212 self.heartmonitor.start()
212 self.heartmonitor.start()
213 self.log.info("Heartmonitor started")
213 self.log.info("Heartmonitor started")
214
214
215 def init_hub(self):
215 def init_hub(self):
216 """construct"""
216 """construct"""
217 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
217 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
218 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
218 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
219
219
220 ctx = self.context
220 ctx = self.context
221 loop = self.loop
221 loop = self.loop
222
222
223 # Registrar socket
223 # Registrar socket
224 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
224 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
225 q.bind(client_iface % self.regport)
225 q.bind(client_iface % self.regport)
226 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
226 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
227 if self.client_ip != self.engine_ip:
227 if self.client_ip != self.engine_ip:
228 q.bind(engine_iface % self.regport)
228 q.bind(engine_iface % self.regport)
229 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
229 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
230
230
231 ### Engine connections ###
231 ### Engine connections ###
232
232
233 # heartbeat
233 # heartbeat
234 hpub = ctx.socket(zmq.PUB)
234 hpub = ctx.socket(zmq.PUB)
235 hpub.bind(engine_iface % self.hb[0])
235 hpub.bind(engine_iface % self.hb[0])
236 hrep = ctx.socket(zmq.ROUTER)
236 hrep = ctx.socket(zmq.ROUTER)
237 hrep.bind(engine_iface % self.hb[1])
237 hrep.bind(engine_iface % self.hb[1])
238 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
238 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
239 pingstream=ZMQStream(hpub,loop),
239 pingstream=ZMQStream(hpub,loop),
240 pongstream=ZMQStream(hrep,loop)
240 pongstream=ZMQStream(hrep,loop)
241 )
241 )
242
242
243 ### Client connections ###
243 ### Client connections ###
244 # Notifier socket
244 # Notifier socket
245 n = ZMQStream(ctx.socket(zmq.PUB), loop)
245 n = ZMQStream(ctx.socket(zmq.PUB), loop)
246 n.bind(client_iface%self.notifier_port)
246 n.bind(client_iface%self.notifier_port)
247
247
248 ### build and launch the queues ###
248 ### build and launch the queues ###
249
249
250 # monitor socket
250 # monitor socket
251 sub = ctx.socket(zmq.SUB)
251 sub = ctx.socket(zmq.SUB)
252 sub.setsockopt(zmq.SUBSCRIBE, b"")
252 sub.setsockopt(zmq.SUBSCRIBE, b"")
253 sub.bind(self.monitor_url)
253 sub.bind(self.monitor_url)
254 sub.bind('inproc://monitor')
254 sub.bind('inproc://monitor')
255 sub = ZMQStream(sub, loop)
255 sub = ZMQStream(sub, loop)
256
256
257 # connect the db
257 # connect the db
258 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
258 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
259 # cdir = self.config.Global.cluster_dir
259 # cdir = self.config.Global.cluster_dir
260 self.db = import_item(str(self.db_class))(session=self.session.session,
260 self.db = import_item(str(self.db_class))(session=self.session.session,
261 config=self.config, log=self.log)
261 config=self.config, log=self.log)
262 time.sleep(.25)
262 time.sleep(.25)
263 try:
263 try:
264 scheme = self.config.TaskScheduler.scheme_name
264 scheme = self.config.TaskScheduler.scheme_name
265 except AttributeError:
265 except AttributeError:
266 from .scheduler import TaskScheduler
266 from .scheduler import TaskScheduler
267 scheme = TaskScheduler.scheme_name.get_default_value()
267 scheme = TaskScheduler.scheme_name.get_default_value()
268 # build connection dicts
268 # build connection dicts
269 self.engine_info = {
269 self.engine_info = {
270 'control' : engine_iface%self.control[1],
270 'control' : engine_iface%self.control[1],
271 'mux': engine_iface%self.mux[1],
271 'mux': engine_iface%self.mux[1],
272 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
272 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
273 'task' : engine_iface%self.task[1],
273 'task' : engine_iface%self.task[1],
274 'iopub' : engine_iface%self.iopub[1],
274 'iopub' : engine_iface%self.iopub[1],
275 # 'monitor' : engine_iface%self.mon_port,
275 # 'monitor' : engine_iface%self.mon_port,
276 }
276 }
277
277
278 self.client_info = {
278 self.client_info = {
279 'control' : client_iface%self.control[0],
279 'control' : client_iface%self.control[0],
280 'mux': client_iface%self.mux[0],
280 'mux': client_iface%self.mux[0],
281 'task' : (scheme, client_iface%self.task[0]),
281 'task' : (scheme, client_iface%self.task[0]),
282 'iopub' : client_iface%self.iopub[0],
282 'iopub' : client_iface%self.iopub[0],
283 'notification': client_iface%self.notifier_port
283 'notification': client_iface%self.notifier_port
284 }
284 }
285 self.log.debug("Hub engine addrs: %s", self.engine_info)
285 self.log.debug("Hub engine addrs: %s", self.engine_info)
286 self.log.debug("Hub client addrs: %s", self.client_info)
286 self.log.debug("Hub client addrs: %s", self.client_info)
287
287
288 # resubmit stream
288 # resubmit stream
289 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
289 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
290 url = util.disambiguate_url(self.client_info['task'][-1])
290 url = util.disambiguate_url(self.client_info['task'][-1])
291 r.setsockopt(zmq.IDENTITY, self.session.bsession)
291 r.setsockopt(zmq.IDENTITY, self.session.bsession)
292 r.connect(url)
292 r.connect(url)
293
293
294 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
294 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
295 query=q, notifier=n, resubmit=r, db=self.db,
295 query=q, notifier=n, resubmit=r, db=self.db,
296 engine_info=self.engine_info, client_info=self.client_info,
296 engine_info=self.engine_info, client_info=self.client_info,
297 log=self.log)
297 log=self.log)
298
298
299
299
300 class Hub(SessionFactory):
300 class Hub(SessionFactory):
301 """The IPython Controller Hub with 0MQ connections
301 """The IPython Controller Hub with 0MQ connections
302
302
303 Parameters
303 Parameters
304 ==========
304 ==========
305 loop: zmq IOLoop instance
305 loop: zmq IOLoop instance
306 session: Session object
306 session: Session object
307 <removed> context: zmq context for creating new connections (?)
307 <removed> context: zmq context for creating new connections (?)
308 queue: ZMQStream for monitoring the command queue (SUB)
308 queue: ZMQStream for monitoring the command queue (SUB)
309 query: ZMQStream for engine registration and client queries requests (XREP)
309 query: ZMQStream for engine registration and client queries requests (XREP)
310 heartbeat: HeartMonitor object checking the pulse of the engines
310 heartbeat: HeartMonitor object checking the pulse of the engines
311 notifier: ZMQStream for broadcasting engine registration changes (PUB)
311 notifier: ZMQStream for broadcasting engine registration changes (PUB)
312 db: connection to db for out of memory logging of commands
312 db: connection to db for out of memory logging of commands
313 NotImplemented
313 NotImplemented
314 engine_info: dict of zmq connection information for engines to connect
314 engine_info: dict of zmq connection information for engines to connect
315 to the queues.
315 to the queues.
316 client_info: dict of zmq connection information for engines to connect
316 client_info: dict of zmq connection information for engines to connect
317 to the queues.
317 to the queues.
318 """
318 """
319 # internal data structures:
319 # internal data structures:
320 ids=Set() # engine IDs
320 ids=Set() # engine IDs
321 keytable=Dict()
321 keytable=Dict()
322 by_ident=Dict()
322 by_ident=Dict()
323 engines=Dict()
323 engines=Dict()
324 clients=Dict()
324 clients=Dict()
325 hearts=Dict()
325 hearts=Dict()
326 pending=Set()
326 pending=Set()
327 queues=Dict() # pending msg_ids keyed by engine_id
327 queues=Dict() # pending msg_ids keyed by engine_id
328 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
328 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
329 completed=Dict() # completed msg_ids keyed by engine_id
329 completed=Dict() # completed msg_ids keyed by engine_id
330 all_completed=Set() # completed msg_ids keyed by engine_id
330 all_completed=Set() # completed msg_ids keyed by engine_id
331 dead_engines=Set() # completed msg_ids keyed by engine_id
331 dead_engines=Set() # completed msg_ids keyed by engine_id
332 unassigned=Set() # set of task msg_ds not yet assigned a destination
332 unassigned=Set() # set of task msg_ds not yet assigned a destination
333 incoming_registrations=Dict()
333 incoming_registrations=Dict()
334 registration_timeout=Integer()
334 registration_timeout=Integer()
335 _idcounter=Integer(0)
335 _idcounter=Integer(0)
336
336
337 # objects from constructor:
337 # objects from constructor:
338 query=Instance(ZMQStream)
338 query=Instance(ZMQStream)
339 monitor=Instance(ZMQStream)
339 monitor=Instance(ZMQStream)
340 notifier=Instance(ZMQStream)
340 notifier=Instance(ZMQStream)
341 resubmit=Instance(ZMQStream)
341 resubmit=Instance(ZMQStream)
342 heartmonitor=Instance(HeartMonitor)
342 heartmonitor=Instance(HeartMonitor)
343 db=Instance(object)
343 db=Instance(object)
344 client_info=Dict()
344 client_info=Dict()
345 engine_info=Dict()
345 engine_info=Dict()
346
346
347
347
348 def __init__(self, **kwargs):
348 def __init__(self, **kwargs):
349 """
349 """
350 # universal:
350 # universal:
351 loop: IOLoop for creating future connections
351 loop: IOLoop for creating future connections
352 session: streamsession for sending serialized data
352 session: streamsession for sending serialized data
353 # engine:
353 # engine:
354 queue: ZMQStream for monitoring queue messages
354 queue: ZMQStream for monitoring queue messages
355 query: ZMQStream for engine+client registration and client requests
355 query: ZMQStream for engine+client registration and client requests
356 heartbeat: HeartMonitor object for tracking engines
356 heartbeat: HeartMonitor object for tracking engines
357 # extra:
357 # extra:
358 db: ZMQStream for db connection (NotImplemented)
358 db: ZMQStream for db connection (NotImplemented)
359 engine_info: zmq address/protocol dict for engine connections
359 engine_info: zmq address/protocol dict for engine connections
360 client_info: zmq address/protocol dict for client connections
360 client_info: zmq address/protocol dict for client connections
361 """
361 """
362
362
363 super(Hub, self).__init__(**kwargs)
363 super(Hub, self).__init__(**kwargs)
364 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
364 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
365
365
366 # validate connection dicts:
366 # validate connection dicts:
367 for k,v in self.client_info.iteritems():
367 for k,v in self.client_info.iteritems():
368 if k == 'task':
368 if k == 'task':
369 util.validate_url_container(v[1])
369 util.validate_url_container(v[1])
370 else:
370 else:
371 util.validate_url_container(v)
371 util.validate_url_container(v)
372 # util.validate_url_container(self.client_info)
372 # util.validate_url_container(self.client_info)
373 util.validate_url_container(self.engine_info)
373 util.validate_url_container(self.engine_info)
374
374
375 # register our callbacks
375 # register our callbacks
376 self.query.on_recv(self.dispatch_query)
376 self.query.on_recv(self.dispatch_query)
377 self.monitor.on_recv(self.dispatch_monitor_traffic)
377 self.monitor.on_recv(self.dispatch_monitor_traffic)
378
378
379 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
379 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
380 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
380 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
381
381
382 self.monitor_handlers = {b'in' : self.save_queue_request,
382 self.monitor_handlers = {b'in' : self.save_queue_request,
383 b'out': self.save_queue_result,
383 b'out': self.save_queue_result,
384 b'intask': self.save_task_request,
384 b'intask': self.save_task_request,
385 b'outtask': self.save_task_result,
385 b'outtask': self.save_task_result,
386 b'tracktask': self.save_task_destination,
386 b'tracktask': self.save_task_destination,
387 b'incontrol': _passer,
387 b'incontrol': _passer,
388 b'outcontrol': _passer,
388 b'outcontrol': _passer,
389 b'iopub': self.save_iopub_message,
389 b'iopub': self.save_iopub_message,
390 }
390 }
391
391
392 self.query_handlers = {'queue_request': self.queue_status,
392 self.query_handlers = {'queue_request': self.queue_status,
393 'result_request': self.get_results,
393 'result_request': self.get_results,
394 'history_request': self.get_history,
394 'history_request': self.get_history,
395 'db_request': self.db_query,
395 'db_request': self.db_query,
396 'purge_request': self.purge_results,
396 'purge_request': self.purge_results,
397 'load_request': self.check_load,
397 'load_request': self.check_load,
398 'resubmit_request': self.resubmit_task,
398 'resubmit_request': self.resubmit_task,
399 'shutdown_request': self.shutdown_request,
399 'shutdown_request': self.shutdown_request,
400 'registration_request' : self.register_engine,
400 'registration_request' : self.register_engine,
401 'unregistration_request' : self.unregister_engine,
401 'unregistration_request' : self.unregister_engine,
402 'connection_request': self.connection_request,
402 'connection_request': self.connection_request,
403 }
403 }
404
404
405 # ignore resubmit replies
405 # ignore resubmit replies
406 self.resubmit.on_recv(lambda msg: None, copy=False)
406 self.resubmit.on_recv(lambda msg: None, copy=False)
407
407
408 self.log.info("hub::created hub")
408 self.log.info("hub::created hub")
409
409
410 @property
410 @property
411 def _next_id(self):
411 def _next_id(self):
412 """gemerate a new ID.
412 """gemerate a new ID.
413
413
414 No longer reuse old ids, just count from 0."""
414 No longer reuse old ids, just count from 0."""
415 newid = self._idcounter
415 newid = self._idcounter
416 self._idcounter += 1
416 self._idcounter += 1
417 return newid
417 return newid
418 # newid = 0
418 # newid = 0
419 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
419 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
420 # # print newid, self.ids, self.incoming_registrations
420 # # print newid, self.ids, self.incoming_registrations
421 # while newid in self.ids or newid in incoming:
421 # while newid in self.ids or newid in incoming:
422 # newid += 1
422 # newid += 1
423 # return newid
423 # return newid
424
424
425 #-----------------------------------------------------------------------------
425 #-----------------------------------------------------------------------------
426 # message validation
426 # message validation
427 #-----------------------------------------------------------------------------
427 #-----------------------------------------------------------------------------
428
428
429 def _validate_targets(self, targets):
429 def _validate_targets(self, targets):
430 """turn any valid targets argument into a list of integer ids"""
430 """turn any valid targets argument into a list of integer ids"""
431 if targets is None:
431 if targets is None:
432 # default to all
432 # default to all
433 targets = self.ids
433 return self.ids
434
434
435 if isinstance(targets, (int,str,unicode)):
435 if isinstance(targets, (int,str,unicode)):
436 # only one target specified
436 # only one target specified
437 targets = [targets]
437 targets = [targets]
438 _targets = []
438 _targets = []
439 for t in targets:
439 for t in targets:
440 # map raw identities to ids
440 # map raw identities to ids
441 if isinstance(t, (str,unicode)):
441 if isinstance(t, (str,unicode)):
442 t = self.by_ident.get(t, t)
442 t = self.by_ident.get(t, t)
443 _targets.append(t)
443 _targets.append(t)
444 targets = _targets
444 targets = _targets
445 bad_targets = [ t for t in targets if t not in self.ids ]
445 bad_targets = [ t for t in targets if t not in self.ids ]
446 if bad_targets:
446 if bad_targets:
447 raise IndexError("No Such Engine: %r" % bad_targets)
447 raise IndexError("No Such Engine: %r" % bad_targets)
448 if not targets:
448 if not targets:
449 raise IndexError("No Engines Registered")
449 raise IndexError("No Engines Registered")
450 return targets
450 return targets
451
451
452 #-----------------------------------------------------------------------------
452 #-----------------------------------------------------------------------------
453 # dispatch methods (1 per stream)
453 # dispatch methods (1 per stream)
454 #-----------------------------------------------------------------------------
454 #-----------------------------------------------------------------------------
455
455
456
456
457 def dispatch_monitor_traffic(self, msg):
457 def dispatch_monitor_traffic(self, msg):
458 """all ME and Task queue messages come through here, as well as
458 """all ME and Task queue messages come through here, as well as
459 IOPub traffic."""
459 IOPub traffic."""
460 self.log.debug("monitor traffic: %r", msg[:2])
460 self.log.debug("monitor traffic: %r", msg[0])
461 switch = msg[0]
461 switch = msg[0]
462 try:
462 try:
463 idents, msg = self.session.feed_identities(msg[1:])
463 idents, msg = self.session.feed_identities(msg[1:])
464 except ValueError:
464 except ValueError:
465 idents=[]
465 idents=[]
466 if not idents:
466 if not idents:
467 self.log.error("Bad Monitor Message: %r", msg)
467 self.log.error("Bad Monitor Message: %r", msg)
468 return
468 return
469 handler = self.monitor_handlers.get(switch, None)
469 handler = self.monitor_handlers.get(switch, None)
470 if handler is not None:
470 if handler is not None:
471 handler(idents, msg)
471 handler(idents, msg)
472 else:
472 else:
473 self.log.error("Invalid monitor topic: %r", switch)
473 self.log.error("Invalid monitor topic: %r", switch)
474
474
475
475
476 def dispatch_query(self, msg):
476 def dispatch_query(self, msg):
477 """Route registration requests and queries from clients."""
477 """Route registration requests and queries from clients."""
478 try:
478 try:
479 idents, msg = self.session.feed_identities(msg)
479 idents, msg = self.session.feed_identities(msg)
480 except ValueError:
480 except ValueError:
481 idents = []
481 idents = []
482 if not idents:
482 if not idents:
483 self.log.error("Bad Query Message: %r", msg)
483 self.log.error("Bad Query Message: %r", msg)
484 return
484 return
485 client_id = idents[0]
485 client_id = idents[0]
486 try:
486 try:
487 msg = self.session.unserialize(msg, content=True)
487 msg = self.session.unserialize(msg, content=True)
488 except Exception:
488 except Exception:
489 content = error.wrap_exception()
489 content = error.wrap_exception()
490 self.log.error("Bad Query Message: %r", msg, exc_info=True)
490 self.log.error("Bad Query Message: %r", msg, exc_info=True)
491 self.session.send(self.query, "hub_error", ident=client_id,
491 self.session.send(self.query, "hub_error", ident=client_id,
492 content=content)
492 content=content)
493 return
493 return
494 # print client_id, header, parent, content
494 # print client_id, header, parent, content
495 #switch on message type:
495 #switch on message type:
496 msg_type = msg['header']['msg_type']
496 msg_type = msg['header']['msg_type']
497 self.log.info("client::client %r requested %r", client_id, msg_type)
497 self.log.info("client::client %r requested %r", client_id, msg_type)
498 handler = self.query_handlers.get(msg_type, None)
498 handler = self.query_handlers.get(msg_type, None)
499 try:
499 try:
500 assert handler is not None, "Bad Message Type: %r" % msg_type
500 assert handler is not None, "Bad Message Type: %r" % msg_type
501 except:
501 except:
502 content = error.wrap_exception()
502 content = error.wrap_exception()
503 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
503 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
504 self.session.send(self.query, "hub_error", ident=client_id,
504 self.session.send(self.query, "hub_error", ident=client_id,
505 content=content)
505 content=content)
506 return
506 return
507
507
508 else:
508 else:
509 handler(idents, msg)
509 handler(idents, msg)
510
510
511 def dispatch_db(self, msg):
511 def dispatch_db(self, msg):
512 """"""
512 """"""
513 raise NotImplementedError
513 raise NotImplementedError
514
514
515 #---------------------------------------------------------------------------
515 #---------------------------------------------------------------------------
516 # handler methods (1 per event)
516 # handler methods (1 per event)
517 #---------------------------------------------------------------------------
517 #---------------------------------------------------------------------------
518
518
519 #----------------------- Heartbeat --------------------------------------
519 #----------------------- Heartbeat --------------------------------------
520
520
521 def handle_new_heart(self, heart):
521 def handle_new_heart(self, heart):
522 """handler to attach to heartbeater.
522 """handler to attach to heartbeater.
523 Called when a new heart starts to beat.
523 Called when a new heart starts to beat.
524 Triggers completion of registration."""
524 Triggers completion of registration."""
525 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
525 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
526 if heart not in self.incoming_registrations:
526 if heart not in self.incoming_registrations:
527 self.log.info("heartbeat::ignoring new heart: %r", heart)
527 self.log.info("heartbeat::ignoring new heart: %r", heart)
528 else:
528 else:
529 self.finish_registration(heart)
529 self.finish_registration(heart)
530
530
531
531
532 def handle_heart_failure(self, heart):
532 def handle_heart_failure(self, heart):
533 """handler to attach to heartbeater.
533 """handler to attach to heartbeater.
534 called when a previously registered heart fails to respond to beat request.
534 called when a previously registered heart fails to respond to beat request.
535 triggers unregistration"""
535 triggers unregistration"""
536 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
536 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
537 eid = self.hearts.get(heart, None)
537 eid = self.hearts.get(heart, None)
538 queue = self.engines[eid].queue
538 queue = self.engines[eid].queue
539 if eid is None or self.keytable[eid] in self.dead_engines:
539 if eid is None or self.keytable[eid] in self.dead_engines:
540 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
540 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
541 else:
541 else:
542 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
542 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
543
543
544 #----------------------- MUX Queue Traffic ------------------------------
544 #----------------------- MUX Queue Traffic ------------------------------
545
545
546 def save_queue_request(self, idents, msg):
546 def save_queue_request(self, idents, msg):
547 if len(idents) < 2:
547 if len(idents) < 2:
548 self.log.error("invalid identity prefix: %r", idents)
548 self.log.error("invalid identity prefix: %r", idents)
549 return
549 return
550 queue_id, client_id = idents[:2]
550 queue_id, client_id = idents[:2]
551 try:
551 try:
552 msg = self.session.unserialize(msg)
552 msg = self.session.unserialize(msg)
553 except Exception:
553 except Exception:
554 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
554 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
555 return
555 return
556
556
557 eid = self.by_ident.get(queue_id, None)
557 eid = self.by_ident.get(queue_id, None)
558 if eid is None:
558 if eid is None:
559 self.log.error("queue::target %r not registered", queue_id)
559 self.log.error("queue::target %r not registered", queue_id)
560 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
560 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
561 return
561 return
562 record = init_record(msg)
562 record = init_record(msg)
563 msg_id = record['msg_id']
563 msg_id = record['msg_id']
564 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
564 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
565 # Unicode in records
565 # Unicode in records
566 record['engine_uuid'] = queue_id.decode('ascii')
566 record['engine_uuid'] = queue_id.decode('ascii')
567 record['client_uuid'] = client_id.decode('ascii')
567 record['client_uuid'] = client_id.decode('ascii')
568 record['queue'] = 'mux'
568 record['queue'] = 'mux'
569
569
570 try:
570 try:
571 # it's posible iopub arrived first:
571 # it's posible iopub arrived first:
572 existing = self.db.get_record(msg_id)
572 existing = self.db.get_record(msg_id)
573 for key,evalue in existing.iteritems():
573 for key,evalue in existing.iteritems():
574 rvalue = record.get(key, None)
574 rvalue = record.get(key, None)
575 if evalue and rvalue and evalue != rvalue:
575 if evalue and rvalue and evalue != rvalue:
576 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
576 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
577 elif evalue and not rvalue:
577 elif evalue and not rvalue:
578 record[key] = evalue
578 record[key] = evalue
579 try:
579 try:
580 self.db.update_record(msg_id, record)
580 self.db.update_record(msg_id, record)
581 except Exception:
581 except Exception:
582 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
582 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
583 except KeyError:
583 except KeyError:
584 try:
584 try:
585 self.db.add_record(msg_id, record)
585 self.db.add_record(msg_id, record)
586 except Exception:
586 except Exception:
587 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
587 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
588
588
589
589
590 self.pending.add(msg_id)
590 self.pending.add(msg_id)
591 self.queues[eid].append(msg_id)
591 self.queues[eid].append(msg_id)
592
592
593 def save_queue_result(self, idents, msg):
593 def save_queue_result(self, idents, msg):
594 if len(idents) < 2:
594 if len(idents) < 2:
595 self.log.error("invalid identity prefix: %r", idents)
595 self.log.error("invalid identity prefix: %r", idents)
596 return
596 return
597
597
598 client_id, queue_id = idents[:2]
598 client_id, queue_id = idents[:2]
599 try:
599 try:
600 msg = self.session.unserialize(msg)
600 msg = self.session.unserialize(msg)
601 except Exception:
601 except Exception:
602 self.log.error("queue::engine %r sent invalid message to %r: %r",
602 self.log.error("queue::engine %r sent invalid message to %r: %r",
603 queue_id, client_id, msg, exc_info=True)
603 queue_id, client_id, msg, exc_info=True)
604 return
604 return
605
605
606 eid = self.by_ident.get(queue_id, None)
606 eid = self.by_ident.get(queue_id, None)
607 if eid is None:
607 if eid is None:
608 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
608 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
609 return
609 return
610
610
611 parent = msg['parent_header']
611 parent = msg['parent_header']
612 if not parent:
612 if not parent:
613 return
613 return
614 msg_id = parent['msg_id']
614 msg_id = parent['msg_id']
615 if msg_id in self.pending:
615 if msg_id in self.pending:
616 self.pending.remove(msg_id)
616 self.pending.remove(msg_id)
617 self.all_completed.add(msg_id)
617 self.all_completed.add(msg_id)
618 self.queues[eid].remove(msg_id)
618 self.queues[eid].remove(msg_id)
619 self.completed[eid].append(msg_id)
619 self.completed[eid].append(msg_id)
620 self.log.info("queue::request %r completed on %s", msg_id, eid)
620 self.log.info("queue::request %r completed on %s", msg_id, eid)
621 elif msg_id not in self.all_completed:
621 elif msg_id not in self.all_completed:
622 # it could be a result from a dead engine that died before delivering the
622 # it could be a result from a dead engine that died before delivering the
623 # result
623 # result
624 self.log.warn("queue:: unknown msg finished %r", msg_id)
624 self.log.warn("queue:: unknown msg finished %r", msg_id)
625 return
625 return
626 # update record anyway, because the unregistration could have been premature
626 # update record anyway, because the unregistration could have been premature
627 rheader = msg['header']
627 rheader = msg['header']
628 completed = rheader['date']
628 completed = rheader['date']
629 started = rheader.get('started', None)
629 started = rheader.get('started', None)
630 result = {
630 result = {
631 'result_header' : rheader,
631 'result_header' : rheader,
632 'result_content': msg['content'],
632 'result_content': msg['content'],
633 'started' : started,
633 'started' : started,
634 'completed' : completed
634 'completed' : completed
635 }
635 }
636
636
637 result['result_buffers'] = msg['buffers']
637 result['result_buffers'] = msg['buffers']
638 try:
638 try:
639 self.db.update_record(msg_id, result)
639 self.db.update_record(msg_id, result)
640 except Exception:
640 except Exception:
641 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
641 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
642
642
643
643
644 #--------------------- Task Queue Traffic ------------------------------
644 #--------------------- Task Queue Traffic ------------------------------
645
645
646 def save_task_request(self, idents, msg):
646 def save_task_request(self, idents, msg):
647 """Save the submission of a task."""
647 """Save the submission of a task."""
648 client_id = idents[0]
648 client_id = idents[0]
649
649
650 try:
650 try:
651 msg = self.session.unserialize(msg)
651 msg = self.session.unserialize(msg)
652 except Exception:
652 except Exception:
653 self.log.error("task::client %r sent invalid task message: %r",
653 self.log.error("task::client %r sent invalid task message: %r",
654 client_id, msg, exc_info=True)
654 client_id, msg, exc_info=True)
655 return
655 return
656 record = init_record(msg)
656 record = init_record(msg)
657
657
658 record['client_uuid'] = client_id.decode('ascii')
658 record['client_uuid'] = client_id.decode('ascii')
659 record['queue'] = 'task'
659 record['queue'] = 'task'
660 header = msg['header']
660 header = msg['header']
661 msg_id = header['msg_id']
661 msg_id = header['msg_id']
662 self.pending.add(msg_id)
662 self.pending.add(msg_id)
663 self.unassigned.add(msg_id)
663 self.unassigned.add(msg_id)
664 try:
664 try:
665 # it's posible iopub arrived first:
665 # it's posible iopub arrived first:
666 existing = self.db.get_record(msg_id)
666 existing = self.db.get_record(msg_id)
667 if existing['resubmitted']:
667 if existing['resubmitted']:
668 for key in ('submitted', 'client_uuid', 'buffers'):
668 for key in ('submitted', 'client_uuid', 'buffers'):
669 # don't clobber these keys on resubmit
669 # don't clobber these keys on resubmit
670 # submitted and client_uuid should be different
670 # submitted and client_uuid should be different
671 # and buffers might be big, and shouldn't have changed
671 # and buffers might be big, and shouldn't have changed
672 record.pop(key)
672 record.pop(key)
673 # still check content,header which should not change
673 # still check content,header which should not change
674 # but are not expensive to compare as buffers
674 # but are not expensive to compare as buffers
675
675
676 for key,evalue in existing.iteritems():
676 for key,evalue in existing.iteritems():
677 if key.endswith('buffers'):
677 if key.endswith('buffers'):
678 # don't compare buffers
678 # don't compare buffers
679 continue
679 continue
680 rvalue = record.get(key, None)
680 rvalue = record.get(key, None)
681 if evalue and rvalue and evalue != rvalue:
681 if evalue and rvalue and evalue != rvalue:
682 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
682 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
683 elif evalue and not rvalue:
683 elif evalue and not rvalue:
684 record[key] = evalue
684 record[key] = evalue
685 try:
685 try:
686 self.db.update_record(msg_id, record)
686 self.db.update_record(msg_id, record)
687 except Exception:
687 except Exception:
688 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
688 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
689 except KeyError:
689 except KeyError:
690 try:
690 try:
691 self.db.add_record(msg_id, record)
691 self.db.add_record(msg_id, record)
692 except Exception:
692 except Exception:
693 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
693 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
694 except Exception:
694 except Exception:
695 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
695 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
696
696
697 def save_task_result(self, idents, msg):
697 def save_task_result(self, idents, msg):
698 """save the result of a completed task."""
698 """save the result of a completed task."""
699 client_id = idents[0]
699 client_id = idents[0]
700 try:
700 try:
701 msg = self.session.unserialize(msg)
701 msg = self.session.unserialize(msg)
702 except Exception:
702 except Exception:
703 self.log.error("task::invalid task result message send to %r: %r",
703 self.log.error("task::invalid task result message send to %r: %r",
704 client_id, msg, exc_info=True)
704 client_id, msg, exc_info=True)
705 return
705 return
706
706
707 parent = msg['parent_header']
707 parent = msg['parent_header']
708 if not parent:
708 if not parent:
709 # print msg
709 # print msg
710 self.log.warn("Task %r had no parent!", msg)
710 self.log.warn("Task %r had no parent!", msg)
711 return
711 return
712 msg_id = parent['msg_id']
712 msg_id = parent['msg_id']
713 if msg_id in self.unassigned:
713 if msg_id in self.unassigned:
714 self.unassigned.remove(msg_id)
714 self.unassigned.remove(msg_id)
715
715
716 header = msg['header']
716 header = msg['header']
717 engine_uuid = header.get('engine', None)
717 engine_uuid = header.get('engine', None)
718 eid = self.by_ident.get(engine_uuid, None)
718 eid = self.by_ident.get(engine_uuid, None)
719
719
720 if msg_id in self.pending:
720 if msg_id in self.pending:
721 self.log.info("task::task %r finished on %s", msg_id, eid)
721 self.log.info("task::task %r finished on %s", msg_id, eid)
722 self.pending.remove(msg_id)
722 self.pending.remove(msg_id)
723 self.all_completed.add(msg_id)
723 self.all_completed.add(msg_id)
724 if eid is not None:
724 if eid is not None:
725 self.completed[eid].append(msg_id)
725 self.completed[eid].append(msg_id)
726 if msg_id in self.tasks[eid]:
726 if msg_id in self.tasks[eid]:
727 self.tasks[eid].remove(msg_id)
727 self.tasks[eid].remove(msg_id)
728 completed = header['date']
728 completed = header['date']
729 started = header.get('started', None)
729 started = header.get('started', None)
730 result = {
730 result = {
731 'result_header' : header,
731 'result_header' : header,
732 'result_content': msg['content'],
732 'result_content': msg['content'],
733 'started' : started,
733 'started' : started,
734 'completed' : completed,
734 'completed' : completed,
735 'engine_uuid': engine_uuid
735 'engine_uuid': engine_uuid
736 }
736 }
737
737
738 result['result_buffers'] = msg['buffers']
738 result['result_buffers'] = msg['buffers']
739 try:
739 try:
740 self.db.update_record(msg_id, result)
740 self.db.update_record(msg_id, result)
741 except Exception:
741 except Exception:
742 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
742 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
743
743
744 else:
744 else:
745 self.log.debug("task::unknown task %r finished", msg_id)
745 self.log.debug("task::unknown task %r finished", msg_id)
746
746
747 def save_task_destination(self, idents, msg):
747 def save_task_destination(self, idents, msg):
748 try:
748 try:
749 msg = self.session.unserialize(msg, content=True)
749 msg = self.session.unserialize(msg, content=True)
750 except Exception:
750 except Exception:
751 self.log.error("task::invalid task tracking message", exc_info=True)
751 self.log.error("task::invalid task tracking message", exc_info=True)
752 return
752 return
753 content = msg['content']
753 content = msg['content']
754 # print (content)
754 # print (content)
755 msg_id = content['msg_id']
755 msg_id = content['msg_id']
756 engine_uuid = content['engine_id']
756 engine_uuid = content['engine_id']
757 eid = self.by_ident[util.asbytes(engine_uuid)]
757 eid = self.by_ident[util.asbytes(engine_uuid)]
758
758
759 self.log.info("task::task %r arrived on %r", msg_id, eid)
759 self.log.info("task::task %r arrived on %r", msg_id, eid)
760 if msg_id in self.unassigned:
760 if msg_id in self.unassigned:
761 self.unassigned.remove(msg_id)
761 self.unassigned.remove(msg_id)
762 # else:
762 # else:
763 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
763 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
764
764
765 self.tasks[eid].append(msg_id)
765 self.tasks[eid].append(msg_id)
766 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
766 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
767 try:
767 try:
768 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
768 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
769 except Exception:
769 except Exception:
770 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
770 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
771
771
772
772
773 def mia_task_request(self, idents, msg):
773 def mia_task_request(self, idents, msg):
774 raise NotImplementedError
774 raise NotImplementedError
775 client_id = idents[0]
775 client_id = idents[0]
776 # content = dict(mia=self.mia,status='ok')
776 # content = dict(mia=self.mia,status='ok')
777 # self.session.send('mia_reply', content=content, idents=client_id)
777 # self.session.send('mia_reply', content=content, idents=client_id)
778
778
779
779
780 #--------------------- IOPub Traffic ------------------------------
780 #--------------------- IOPub Traffic ------------------------------
781
781
782 def save_iopub_message(self, topics, msg):
782 def save_iopub_message(self, topics, msg):
783 """save an iopub message into the db"""
783 """save an iopub message into the db"""
784 # print (topics)
784 # print (topics)
785 try:
785 try:
786 msg = self.session.unserialize(msg, content=True)
786 msg = self.session.unserialize(msg, content=True)
787 except Exception:
787 except Exception:
788 self.log.error("iopub::invalid IOPub message", exc_info=True)
788 self.log.error("iopub::invalid IOPub message", exc_info=True)
789 return
789 return
790
790
791 parent = msg['parent_header']
791 parent = msg['parent_header']
792 if not parent:
792 if not parent:
793 self.log.error("iopub::invalid IOPub message: %r", msg)
793 self.log.error("iopub::invalid IOPub message: %r", msg)
794 return
794 return
795 msg_id = parent['msg_id']
795 msg_id = parent['msg_id']
796 msg_type = msg['header']['msg_type']
796 msg_type = msg['header']['msg_type']
797 content = msg['content']
797 content = msg['content']
798
798
799 # ensure msg_id is in db
799 # ensure msg_id is in db
800 try:
800 try:
801 rec = self.db.get_record(msg_id)
801 rec = self.db.get_record(msg_id)
802 except KeyError:
802 except KeyError:
803 rec = empty_record()
803 rec = empty_record()
804 rec['msg_id'] = msg_id
804 rec['msg_id'] = msg_id
805 self.db.add_record(msg_id, rec)
805 self.db.add_record(msg_id, rec)
806 # stream
806 # stream
807 d = {}
807 d = {}
808 if msg_type == 'stream':
808 if msg_type == 'stream':
809 name = content['name']
809 name = content['name']
810 s = rec[name] or ''
810 s = rec[name] or ''
811 d[name] = s + content['data']
811 d[name] = s + content['data']
812
812
813 elif msg_type == 'pyerr':
813 elif msg_type == 'pyerr':
814 d['pyerr'] = content
814 d['pyerr'] = content
815 elif msg_type == 'pyin':
815 elif msg_type == 'pyin':
816 d['pyin'] = content['code']
816 d['pyin'] = content['code']
817 else:
817 else:
818 d[msg_type] = content.get('data', '')
818 d[msg_type] = content.get('data', '')
819
819
820 try:
820 try:
821 self.db.update_record(msg_id, d)
821 self.db.update_record(msg_id, d)
822 except Exception:
822 except Exception:
823 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
823 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
824
824
825
825
826
826
827 #-------------------------------------------------------------------------
827 #-------------------------------------------------------------------------
828 # Registration requests
828 # Registration requests
829 #-------------------------------------------------------------------------
829 #-------------------------------------------------------------------------
830
830
831 def connection_request(self, client_id, msg):
831 def connection_request(self, client_id, msg):
832 """Reply with connection addresses for clients."""
832 """Reply with connection addresses for clients."""
833 self.log.info("client::client %r connected", client_id)
833 self.log.info("client::client %r connected", client_id)
834 content = dict(status='ok')
834 content = dict(status='ok')
835 content.update(self.client_info)
835 content.update(self.client_info)
836 jsonable = {}
836 jsonable = {}
837 for k,v in self.keytable.iteritems():
837 for k,v in self.keytable.iteritems():
838 if v not in self.dead_engines:
838 if v not in self.dead_engines:
839 jsonable[str(k)] = v.decode('ascii')
839 jsonable[str(k)] = v.decode('ascii')
840 content['engines'] = jsonable
840 content['engines'] = jsonable
841 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
841 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
842
842
843 def register_engine(self, reg, msg):
843 def register_engine(self, reg, msg):
844 """Register a new engine."""
844 """Register a new engine."""
845 content = msg['content']
845 content = msg['content']
846 try:
846 try:
847 queue = util.asbytes(content['queue'])
847 queue = util.asbytes(content['queue'])
848 except KeyError:
848 except KeyError:
849 self.log.error("registration::queue not specified", exc_info=True)
849 self.log.error("registration::queue not specified", exc_info=True)
850 return
850 return
851 heart = content.get('heartbeat', None)
851 heart = content.get('heartbeat', None)
852 if heart:
852 if heart:
853 heart = util.asbytes(heart)
853 heart = util.asbytes(heart)
854 """register a new engine, and create the socket(s) necessary"""
854 """register a new engine, and create the socket(s) necessary"""
855 eid = self._next_id
855 eid = self._next_id
856 # print (eid, queue, reg, heart)
856 # print (eid, queue, reg, heart)
857
857
858 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
858 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
859
859
860 content = dict(id=eid,status='ok')
860 content = dict(id=eid,status='ok')
861 content.update(self.engine_info)
861 content.update(self.engine_info)
862 # check if requesting available IDs:
862 # check if requesting available IDs:
863 if queue in self.by_ident:
863 if queue in self.by_ident:
864 try:
864 try:
865 raise KeyError("queue_id %r in use" % queue)
865 raise KeyError("queue_id %r in use" % queue)
866 except:
866 except:
867 content = error.wrap_exception()
867 content = error.wrap_exception()
868 self.log.error("queue_id %r in use", queue, exc_info=True)
868 self.log.error("queue_id %r in use", queue, exc_info=True)
869 elif heart in self.hearts: # need to check unique hearts?
869 elif heart in self.hearts: # need to check unique hearts?
870 try:
870 try:
871 raise KeyError("heart_id %r in use" % heart)
871 raise KeyError("heart_id %r in use" % heart)
872 except:
872 except:
873 self.log.error("heart_id %r in use", heart, exc_info=True)
873 self.log.error("heart_id %r in use", heart, exc_info=True)
874 content = error.wrap_exception()
874 content = error.wrap_exception()
875 else:
875 else:
876 for h, pack in self.incoming_registrations.iteritems():
876 for h, pack in self.incoming_registrations.iteritems():
877 if heart == h:
877 if heart == h:
878 try:
878 try:
879 raise KeyError("heart_id %r in use" % heart)
879 raise KeyError("heart_id %r in use" % heart)
880 except:
880 except:
881 self.log.error("heart_id %r in use", heart, exc_info=True)
881 self.log.error("heart_id %r in use", heart, exc_info=True)
882 content = error.wrap_exception()
882 content = error.wrap_exception()
883 break
883 break
884 elif queue == pack[1]:
884 elif queue == pack[1]:
885 try:
885 try:
886 raise KeyError("queue_id %r in use" % queue)
886 raise KeyError("queue_id %r in use" % queue)
887 except:
887 except:
888 self.log.error("queue_id %r in use", queue, exc_info=True)
888 self.log.error("queue_id %r in use", queue, exc_info=True)
889 content = error.wrap_exception()
889 content = error.wrap_exception()
890 break
890 break
891
891
892 msg = self.session.send(self.query, "registration_reply",
892 msg = self.session.send(self.query, "registration_reply",
893 content=content,
893 content=content,
894 ident=reg)
894 ident=reg)
895
895
896 if content['status'] == 'ok':
896 if content['status'] == 'ok':
897 if heart in self.heartmonitor.hearts:
897 if heart in self.heartmonitor.hearts:
898 # already beating
898 # already beating
899 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
899 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
900 self.finish_registration(heart)
900 self.finish_registration(heart)
901 else:
901 else:
902 purge = lambda : self._purge_stalled_registration(heart)
902 purge = lambda : self._purge_stalled_registration(heart)
903 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
903 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
904 dc.start()
904 dc.start()
905 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
905 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
906 else:
906 else:
907 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
907 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
908 return eid
908 return eid
909
909
910 def unregister_engine(self, ident, msg):
910 def unregister_engine(self, ident, msg):
911 """Unregister an engine that explicitly requested to leave."""
911 """Unregister an engine that explicitly requested to leave."""
912 try:
912 try:
913 eid = msg['content']['id']
913 eid = msg['content']['id']
914 except:
914 except:
915 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
915 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
916 return
916 return
917 self.log.info("registration::unregister_engine(%r)", eid)
917 self.log.info("registration::unregister_engine(%r)", eid)
918 # print (eid)
918 # print (eid)
919 uuid = self.keytable[eid]
919 uuid = self.keytable[eid]
920 content=dict(id=eid, queue=uuid.decode('ascii'))
920 content=dict(id=eid, queue=uuid.decode('ascii'))
921 self.dead_engines.add(uuid)
921 self.dead_engines.add(uuid)
922 # self.ids.remove(eid)
922 # self.ids.remove(eid)
923 # uuid = self.keytable.pop(eid)
923 # uuid = self.keytable.pop(eid)
924 #
924 #
925 # ec = self.engines.pop(eid)
925 # ec = self.engines.pop(eid)
926 # self.hearts.pop(ec.heartbeat)
926 # self.hearts.pop(ec.heartbeat)
927 # self.by_ident.pop(ec.queue)
927 # self.by_ident.pop(ec.queue)
928 # self.completed.pop(eid)
928 # self.completed.pop(eid)
929 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
929 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
930 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
930 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
931 dc.start()
931 dc.start()
932 ############## TODO: HANDLE IT ################
932 ############## TODO: HANDLE IT ################
933
933
934 if self.notifier:
934 if self.notifier:
935 self.session.send(self.notifier, "unregistration_notification", content=content)
935 self.session.send(self.notifier, "unregistration_notification", content=content)
936
936
937 def _handle_stranded_msgs(self, eid, uuid):
937 def _handle_stranded_msgs(self, eid, uuid):
938 """Handle messages known to be on an engine when the engine unregisters.
938 """Handle messages known to be on an engine when the engine unregisters.
939
939
940 It is possible that this will fire prematurely - that is, an engine will
940 It is possible that this will fire prematurely - that is, an engine will
941 go down after completing a result, and the client will be notified
941 go down after completing a result, and the client will be notified
942 that the result failed and later receive the actual result.
942 that the result failed and later receive the actual result.
943 """
943 """
944
944
945 outstanding = self.queues[eid]
945 outstanding = self.queues[eid]
946
946
947 for msg_id in outstanding:
947 for msg_id in outstanding:
948 self.pending.remove(msg_id)
948 self.pending.remove(msg_id)
949 self.all_completed.add(msg_id)
949 self.all_completed.add(msg_id)
950 try:
950 try:
951 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
951 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
952 except:
952 except:
953 content = error.wrap_exception()
953 content = error.wrap_exception()
954 # build a fake header:
954 # build a fake header:
955 header = {}
955 header = {}
956 header['engine'] = uuid
956 header['engine'] = uuid
957 header['date'] = datetime.now()
957 header['date'] = datetime.now()
958 rec = dict(result_content=content, result_header=header, result_buffers=[])
958 rec = dict(result_content=content, result_header=header, result_buffers=[])
959 rec['completed'] = header['date']
959 rec['completed'] = header['date']
960 rec['engine_uuid'] = uuid
960 rec['engine_uuid'] = uuid
961 try:
961 try:
962 self.db.update_record(msg_id, rec)
962 self.db.update_record(msg_id, rec)
963 except Exception:
963 except Exception:
964 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
964 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
965
965
966
966
967 def finish_registration(self, heart):
967 def finish_registration(self, heart):
968 """Second half of engine registration, called after our HeartMonitor
968 """Second half of engine registration, called after our HeartMonitor
969 has received a beat from the Engine's Heart."""
969 has received a beat from the Engine's Heart."""
970 try:
970 try:
971 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
971 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
972 except KeyError:
972 except KeyError:
973 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
973 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
974 return
974 return
975 self.log.info("registration::finished registering engine %i:%r", eid, queue)
975 self.log.info("registration::finished registering engine %i:%r", eid, queue)
976 if purge is not None:
976 if purge is not None:
977 purge.stop()
977 purge.stop()
978 control = queue
978 control = queue
979 self.ids.add(eid)
979 self.ids.add(eid)
980 self.keytable[eid] = queue
980 self.keytable[eid] = queue
981 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
981 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
982 control=control, heartbeat=heart)
982 control=control, heartbeat=heart)
983 self.by_ident[queue] = eid
983 self.by_ident[queue] = eid
984 self.queues[eid] = list()
984 self.queues[eid] = list()
985 self.tasks[eid] = list()
985 self.tasks[eid] = list()
986 self.completed[eid] = list()
986 self.completed[eid] = list()
987 self.hearts[heart] = eid
987 self.hearts[heart] = eid
988 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
988 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
989 if self.notifier:
989 if self.notifier:
990 self.session.send(self.notifier, "registration_notification", content=content)
990 self.session.send(self.notifier, "registration_notification", content=content)
991 self.log.info("engine::Engine Connected: %i", eid)
991 self.log.info("engine::Engine Connected: %i", eid)
992
992
993 def _purge_stalled_registration(self, heart):
993 def _purge_stalled_registration(self, heart):
994 if heart in self.incoming_registrations:
994 if heart in self.incoming_registrations:
995 eid = self.incoming_registrations.pop(heart)[0]
995 eid = self.incoming_registrations.pop(heart)[0]
996 self.log.info("registration::purging stalled registration: %i", eid)
996 self.log.info("registration::purging stalled registration: %i", eid)
997 else:
997 else:
998 pass
998 pass
999
999
1000 #-------------------------------------------------------------------------
1000 #-------------------------------------------------------------------------
1001 # Client Requests
1001 # Client Requests
1002 #-------------------------------------------------------------------------
1002 #-------------------------------------------------------------------------
1003
1003
1004 def shutdown_request(self, client_id, msg):
1004 def shutdown_request(self, client_id, msg):
1005 """handle shutdown request."""
1005 """handle shutdown request."""
1006 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1006 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1007 # also notify other clients of shutdown
1007 # also notify other clients of shutdown
1008 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1008 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1009 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1009 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1010 dc.start()
1010 dc.start()
1011
1011
1012 def _shutdown(self):
1012 def _shutdown(self):
1013 self.log.info("hub::hub shutting down.")
1013 self.log.info("hub::hub shutting down.")
1014 time.sleep(0.1)
1014 time.sleep(0.1)
1015 sys.exit(0)
1015 sys.exit(0)
1016
1016
1017
1017
1018 def check_load(self, client_id, msg):
1018 def check_load(self, client_id, msg):
1019 content = msg['content']
1019 content = msg['content']
1020 try:
1020 try:
1021 targets = content['targets']
1021 targets = content['targets']
1022 targets = self._validate_targets(targets)
1022 targets = self._validate_targets(targets)
1023 except:
1023 except:
1024 content = error.wrap_exception()
1024 content = error.wrap_exception()
1025 self.session.send(self.query, "hub_error",
1025 self.session.send(self.query, "hub_error",
1026 content=content, ident=client_id)
1026 content=content, ident=client_id)
1027 return
1027 return
1028
1028
1029 content = dict(status='ok')
1029 content = dict(status='ok')
1030 # loads = {}
1030 # loads = {}
1031 for t in targets:
1031 for t in targets:
1032 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1032 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1033 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1033 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1034
1034
1035
1035
1036 def queue_status(self, client_id, msg):
1036 def queue_status(self, client_id, msg):
1037 """Return the Queue status of one or more targets.
1037 """Return the Queue status of one or more targets.
1038 if verbose: return the msg_ids
1038 if verbose: return the msg_ids
1039 else: return len of each type.
1039 else: return len of each type.
1040 keys: queue (pending MUX jobs)
1040 keys: queue (pending MUX jobs)
1041 tasks (pending Task jobs)
1041 tasks (pending Task jobs)
1042 completed (finished jobs from both queues)"""
1042 completed (finished jobs from both queues)"""
1043 content = msg['content']
1043 content = msg['content']
1044 targets = content['targets']
1044 targets = content['targets']
1045 try:
1045 try:
1046 targets = self._validate_targets(targets)
1046 targets = self._validate_targets(targets)
1047 except:
1047 except:
1048 content = error.wrap_exception()
1048 content = error.wrap_exception()
1049 self.session.send(self.query, "hub_error",
1049 self.session.send(self.query, "hub_error",
1050 content=content, ident=client_id)
1050 content=content, ident=client_id)
1051 return
1051 return
1052 verbose = content.get('verbose', False)
1052 verbose = content.get('verbose', False)
1053 content = dict(status='ok')
1053 content = dict(status='ok')
1054 for t in targets:
1054 for t in targets:
1055 queue = self.queues[t]
1055 queue = self.queues[t]
1056 completed = self.completed[t]
1056 completed = self.completed[t]
1057 tasks = self.tasks[t]
1057 tasks = self.tasks[t]
1058 if not verbose:
1058 if not verbose:
1059 queue = len(queue)
1059 queue = len(queue)
1060 completed = len(completed)
1060 completed = len(completed)
1061 tasks = len(tasks)
1061 tasks = len(tasks)
1062 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1062 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1063 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1063 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1064 # print (content)
1064 # print (content)
1065 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1065 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1066
1066
1067 def purge_results(self, client_id, msg):
1067 def purge_results(self, client_id, msg):
1068 """Purge results from memory. This method is more valuable before we move
1068 """Purge results from memory. This method is more valuable before we move
1069 to a DB based message storage mechanism."""
1069 to a DB based message storage mechanism."""
1070 content = msg['content']
1070 content = msg['content']
1071 self.log.info("Dropping records with %s", content)
1071 self.log.info("Dropping records with %s", content)
1072 msg_ids = content.get('msg_ids', [])
1072 msg_ids = content.get('msg_ids', [])
1073 reply = dict(status='ok')
1073 reply = dict(status='ok')
1074 if msg_ids == 'all':
1074 if msg_ids == 'all':
1075 try:
1075 try:
1076 self.db.drop_matching_records(dict(completed={'$ne':None}))
1076 self.db.drop_matching_records(dict(completed={'$ne':None}))
1077 except Exception:
1077 except Exception:
1078 reply = error.wrap_exception()
1078 reply = error.wrap_exception()
1079 else:
1079 else:
1080 pending = filter(lambda m: m in self.pending, msg_ids)
1080 pending = filter(lambda m: m in self.pending, msg_ids)
1081 if pending:
1081 if pending:
1082 try:
1082 try:
1083 raise IndexError("msg pending: %r" % pending[0])
1083 raise IndexError("msg pending: %r" % pending[0])
1084 except:
1084 except:
1085 reply = error.wrap_exception()
1085 reply = error.wrap_exception()
1086 else:
1086 else:
1087 try:
1087 try:
1088 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1088 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1089 except Exception:
1089 except Exception:
1090 reply = error.wrap_exception()
1090 reply = error.wrap_exception()
1091
1091
1092 if reply['status'] == 'ok':
1092 if reply['status'] == 'ok':
1093 eids = content.get('engine_ids', [])
1093 eids = content.get('engine_ids', [])
1094 for eid in eids:
1094 for eid in eids:
1095 if eid not in self.engines:
1095 if eid not in self.engines:
1096 try:
1096 try:
1097 raise IndexError("No such engine: %i" % eid)
1097 raise IndexError("No such engine: %i" % eid)
1098 except:
1098 except:
1099 reply = error.wrap_exception()
1099 reply = error.wrap_exception()
1100 break
1100 break
1101 uid = self.engines[eid].queue
1101 uid = self.engines[eid].queue
1102 try:
1102 try:
1103 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1103 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1104 except Exception:
1104 except Exception:
1105 reply = error.wrap_exception()
1105 reply = error.wrap_exception()
1106 break
1106 break
1107
1107
1108 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1108 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1109
1109
1110 def resubmit_task(self, client_id, msg):
1110 def resubmit_task(self, client_id, msg):
1111 """Resubmit one or more tasks."""
1111 """Resubmit one or more tasks."""
1112 def finish(reply):
1112 def finish(reply):
1113 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1113 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1114
1114
1115 content = msg['content']
1115 content = msg['content']
1116 msg_ids = content['msg_ids']
1116 msg_ids = content['msg_ids']
1117 reply = dict(status='ok')
1117 reply = dict(status='ok')
1118 try:
1118 try:
1119 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1119 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1120 'header', 'content', 'buffers'])
1120 'header', 'content', 'buffers'])
1121 except Exception:
1121 except Exception:
1122 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1122 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1123 return finish(error.wrap_exception())
1123 return finish(error.wrap_exception())
1124
1124
1125 # validate msg_ids
1125 # validate msg_ids
1126 found_ids = [ rec['msg_id'] for rec in records ]
1126 found_ids = [ rec['msg_id'] for rec in records ]
1127 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1127 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1128 if len(records) > len(msg_ids):
1128 if len(records) > len(msg_ids):
1129 try:
1129 try:
1130 raise RuntimeError("DB appears to be in an inconsistent state."
1130 raise RuntimeError("DB appears to be in an inconsistent state."
1131 "More matching records were found than should exist")
1131 "More matching records were found than should exist")
1132 except Exception:
1132 except Exception:
1133 return finish(error.wrap_exception())
1133 return finish(error.wrap_exception())
1134 elif len(records) < len(msg_ids):
1134 elif len(records) < len(msg_ids):
1135 missing = [ m for m in msg_ids if m not in found_ids ]
1135 missing = [ m for m in msg_ids if m not in found_ids ]
1136 try:
1136 try:
1137 raise KeyError("No such msg(s): %r" % missing)
1137 raise KeyError("No such msg(s): %r" % missing)
1138 except KeyError:
1138 except KeyError:
1139 return finish(error.wrap_exception())
1139 return finish(error.wrap_exception())
1140 elif invalid_ids:
1140 elif invalid_ids:
1141 msg_id = invalid_ids[0]
1141 msg_id = invalid_ids[0]
1142 try:
1142 try:
1143 raise ValueError("Task %r appears to be inflight" % msg_id)
1143 raise ValueError("Task %r appears to be inflight" % msg_id)
1144 except Exception:
1144 except Exception:
1145 return finish(error.wrap_exception())
1145 return finish(error.wrap_exception())
1146
1146
1147 # clear the existing records
1147 # clear the existing records
1148 now = datetime.now()
1148 now = datetime.now()
1149 rec = empty_record()
1149 rec = empty_record()
1150 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1150 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1151 rec['resubmitted'] = now
1151 rec['resubmitted'] = now
1152 rec['queue'] = 'task'
1152 rec['queue'] = 'task'
1153 rec['client_uuid'] = client_id[0]
1153 rec['client_uuid'] = client_id[0]
1154 try:
1154 try:
1155 for msg_id in msg_ids:
1155 for msg_id in msg_ids:
1156 self.all_completed.discard(msg_id)
1156 self.all_completed.discard(msg_id)
1157 self.db.update_record(msg_id, rec)
1157 self.db.update_record(msg_id, rec)
1158 except Exception:
1158 except Exception:
1159 self.log.error('db::db error upating record', exc_info=True)
1159 self.log.error('db::db error upating record', exc_info=True)
1160 reply = error.wrap_exception()
1160 reply = error.wrap_exception()
1161 else:
1161 else:
1162 # send the messages
1162 # send the messages
1163 for rec in records:
1163 for rec in records:
1164 header = rec['header']
1164 header = rec['header']
1165 # include resubmitted in header to prevent digest collision
1165 # include resubmitted in header to prevent digest collision
1166 header['resubmitted'] = now
1166 header['resubmitted'] = now
1167 msg = self.session.msg(header['msg_type'])
1167 msg = self.session.msg(header['msg_type'])
1168 msg['content'] = rec['content']
1168 msg['content'] = rec['content']
1169 msg['header'] = header
1169 msg['header'] = header
1170 msg['header']['msg_id'] = rec['msg_id']
1170 msg['header']['msg_id'] = rec['msg_id']
1171 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1171 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1172
1172
1173 finish(dict(status='ok'))
1173 finish(dict(status='ok'))
1174
1174
1175
1175
1176 def _extract_record(self, rec):
1176 def _extract_record(self, rec):
1177 """decompose a TaskRecord dict into subsection of reply for get_result"""
1177 """decompose a TaskRecord dict into subsection of reply for get_result"""
1178 io_dict = {}
1178 io_dict = {}
1179 for key in 'pyin pyout pyerr stdout stderr'.split():
1179 for key in 'pyin pyout pyerr stdout stderr'.split():
1180 io_dict[key] = rec[key]
1180 io_dict[key] = rec[key]
1181 content = { 'result_content': rec['result_content'],
1181 content = { 'result_content': rec['result_content'],
1182 'header': rec['header'],
1182 'header': rec['header'],
1183 'result_header' : rec['result_header'],
1183 'result_header' : rec['result_header'],
1184 'io' : io_dict,
1184 'io' : io_dict,
1185 }
1185 }
1186 if rec['result_buffers']:
1186 if rec['result_buffers']:
1187 buffers = map(bytes, rec['result_buffers'])
1187 buffers = map(bytes, rec['result_buffers'])
1188 else:
1188 else:
1189 buffers = []
1189 buffers = []
1190
1190
1191 return content, buffers
1191 return content, buffers
1192
1192
1193 def get_results(self, client_id, msg):
1193 def get_results(self, client_id, msg):
1194 """Get the result of 1 or more messages."""
1194 """Get the result of 1 or more messages."""
1195 content = msg['content']
1195 content = msg['content']
1196 msg_ids = sorted(set(content['msg_ids']))
1196 msg_ids = sorted(set(content['msg_ids']))
1197 statusonly = content.get('status_only', False)
1197 statusonly = content.get('status_only', False)
1198 pending = []
1198 pending = []
1199 completed = []
1199 completed = []
1200 content = dict(status='ok')
1200 content = dict(status='ok')
1201 content['pending'] = pending
1201 content['pending'] = pending
1202 content['completed'] = completed
1202 content['completed'] = completed
1203 buffers = []
1203 buffers = []
1204 if not statusonly:
1204 if not statusonly:
1205 try:
1205 try:
1206 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1206 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1207 # turn match list into dict, for faster lookup
1207 # turn match list into dict, for faster lookup
1208 records = {}
1208 records = {}
1209 for rec in matches:
1209 for rec in matches:
1210 records[rec['msg_id']] = rec
1210 records[rec['msg_id']] = rec
1211 except Exception:
1211 except Exception:
1212 content = error.wrap_exception()
1212 content = error.wrap_exception()
1213 self.session.send(self.query, "result_reply", content=content,
1213 self.session.send(self.query, "result_reply", content=content,
1214 parent=msg, ident=client_id)
1214 parent=msg, ident=client_id)
1215 return
1215 return
1216 else:
1216 else:
1217 records = {}
1217 records = {}
1218 for msg_id in msg_ids:
1218 for msg_id in msg_ids:
1219 if msg_id in self.pending:
1219 if msg_id in self.pending:
1220 pending.append(msg_id)
1220 pending.append(msg_id)
1221 elif msg_id in self.all_completed:
1221 elif msg_id in self.all_completed:
1222 completed.append(msg_id)
1222 completed.append(msg_id)
1223 if not statusonly:
1223 if not statusonly:
1224 c,bufs = self._extract_record(records[msg_id])
1224 c,bufs = self._extract_record(records[msg_id])
1225 content[msg_id] = c
1225 content[msg_id] = c
1226 buffers.extend(bufs)
1226 buffers.extend(bufs)
1227 elif msg_id in records:
1227 elif msg_id in records:
1228 if rec['completed']:
1228 if rec['completed']:
1229 completed.append(msg_id)
1229 completed.append(msg_id)
1230 c,bufs = self._extract_record(records[msg_id])
1230 c,bufs = self._extract_record(records[msg_id])
1231 content[msg_id] = c
1231 content[msg_id] = c
1232 buffers.extend(bufs)
1232 buffers.extend(bufs)
1233 else:
1233 else:
1234 pending.append(msg_id)
1234 pending.append(msg_id)
1235 else:
1235 else:
1236 try:
1236 try:
1237 raise KeyError('No such message: '+msg_id)
1237 raise KeyError('No such message: '+msg_id)
1238 except:
1238 except:
1239 content = error.wrap_exception()
1239 content = error.wrap_exception()
1240 break
1240 break
1241 self.session.send(self.query, "result_reply", content=content,
1241 self.session.send(self.query, "result_reply", content=content,
1242 parent=msg, ident=client_id,
1242 parent=msg, ident=client_id,
1243 buffers=buffers)
1243 buffers=buffers)
1244
1244
1245 def get_history(self, client_id, msg):
1245 def get_history(self, client_id, msg):
1246 """Get a list of all msg_ids in our DB records"""
1246 """Get a list of all msg_ids in our DB records"""
1247 try:
1247 try:
1248 msg_ids = self.db.get_history()
1248 msg_ids = self.db.get_history()
1249 except Exception as e:
1249 except Exception as e:
1250 content = error.wrap_exception()
1250 content = error.wrap_exception()
1251 else:
1251 else:
1252 content = dict(status='ok', history=msg_ids)
1252 content = dict(status='ok', history=msg_ids)
1253
1253
1254 self.session.send(self.query, "history_reply", content=content,
1254 self.session.send(self.query, "history_reply", content=content,
1255 parent=msg, ident=client_id)
1255 parent=msg, ident=client_id)
1256
1256
1257 def db_query(self, client_id, msg):
1257 def db_query(self, client_id, msg):
1258 """Perform a raw query on the task record database."""
1258 """Perform a raw query on the task record database."""
1259 content = msg['content']
1259 content = msg['content']
1260 query = content.get('query', {})
1260 query = content.get('query', {})
1261 keys = content.get('keys', None)
1261 keys = content.get('keys', None)
1262 buffers = []
1262 buffers = []
1263 empty = list()
1263 empty = list()
1264 try:
1264 try:
1265 records = self.db.find_records(query, keys)
1265 records = self.db.find_records(query, keys)
1266 except Exception as e:
1266 except Exception as e:
1267 content = error.wrap_exception()
1267 content = error.wrap_exception()
1268 else:
1268 else:
1269 # extract buffers from reply content:
1269 # extract buffers from reply content:
1270 if keys is not None:
1270 if keys is not None:
1271 buffer_lens = [] if 'buffers' in keys else None
1271 buffer_lens = [] if 'buffers' in keys else None
1272 result_buffer_lens = [] if 'result_buffers' in keys else None
1272 result_buffer_lens = [] if 'result_buffers' in keys else None
1273 else:
1273 else:
1274 buffer_lens = []
1274 buffer_lens = None
1275 result_buffer_lens = []
1275 result_buffer_lens = None
1276
1276
1277 for rec in records:
1277 for rec in records:
1278 # buffers may be None, so double check
1278 # buffers may be None, so double check
1279 b = rec.pop('buffers', empty) or empty
1279 if buffer_lens is not None:
1280 if buffer_lens is not None:
1280 b = rec.pop('buffers', empty) or empty
1281 buffer_lens.append(len(b))
1281 buffer_lens.append(len(b))
1282 buffers.extend(b)
1282 buffers.extend(b)
1283 rb = rec.pop('result_buffers', empty) or empty
1283 if result_buffer_lens is not None:
1284 if result_buffer_lens is not None:
1284 rb = rec.pop('result_buffers', empty) or empty
1285 result_buffer_lens.append(len(rb))
1285 result_buffer_lens.append(len(rb))
1286 buffers.extend(rb)
1286 buffers.extend(rb)
1287 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1287 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1288 result_buffer_lens=result_buffer_lens)
1288 result_buffer_lens=result_buffer_lens)
1289 # self.log.debug (content)
1289 # self.log.debug (content)
1290 self.session.send(self.query, "db_reply", content=content,
1290 self.session.send(self.query, "db_reply", content=content,
1291 parent=msg, ident=client_id,
1291 parent=msg, ident=client_id,
1292 buffers=buffers)
1292 buffers=buffers)
1293
1293
@@ -1,732 +1,738 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6
6
7 Authors:
7 Authors:
8
8
9 * Min RK
9 * Min RK
10 """
10 """
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Copyright (C) 2010-2011 The IPython Development Team
12 # Copyright (C) 2010-2011 The IPython Development Team
13 #
13 #
14 # Distributed under the terms of the BSD License. The full license is in
14 # Distributed under the terms of the BSD License. The full license is in
15 # the file COPYING, distributed as part of this software.
15 # the file COPYING, distributed as part of this software.
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 #----------------------------------------------------------------------
18 #----------------------------------------------------------------------
19 # Imports
19 # Imports
20 #----------------------------------------------------------------------
20 #----------------------------------------------------------------------
21
21
22 from __future__ import print_function
22 from __future__ import print_function
23
23
24 import logging
24 import logging
25 import sys
25 import sys
26
26
27 from datetime import datetime, timedelta
27 from datetime import datetime, timedelta
28 from random import randint, random
28 from random import randint, random
29 from types import FunctionType
29 from types import FunctionType
30
30
31 try:
31 try:
32 import numpy
32 import numpy
33 except ImportError:
33 except ImportError:
34 numpy = None
34 numpy = None
35
35
36 import zmq
36 import zmq
37 from zmq.eventloop import ioloop, zmqstream
37 from zmq.eventloop import ioloop, zmqstream
38
38
39 # local imports
39 # local imports
40 from IPython.external.decorator import decorator
40 from IPython.external.decorator import decorator
41 from IPython.config.application import Application
41 from IPython.config.application import Application
42 from IPython.config.loader import Config
42 from IPython.config.loader import Config
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
43 from IPython.utils.traitlets import Instance, Dict, List, Set, Integer, Enum, CBytes
44
44
45 from IPython.parallel import error
45 from IPython.parallel import error
46 from IPython.parallel.factory import SessionFactory
46 from IPython.parallel.factory import SessionFactory
47 from IPython.parallel.util import connect_logger, local_logger, asbytes
47 from IPython.parallel.util import connect_logger, local_logger, asbytes
48
48
49 from .dependency import Dependency
49 from .dependency import Dependency
50
50
51 @decorator
51 @decorator
52 def logged(f,self,*args,**kwargs):
52 def logged(f,self,*args,**kwargs):
53 # print ("#--------------------")
53 # print ("#--------------------")
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
54 self.log.debug("scheduler::%s(*%s,**%s)", f.func_name, args, kwargs)
55 # print ("#--")
55 # print ("#--")
56 return f(self,*args, **kwargs)
56 return f(self,*args, **kwargs)
57
57
58 #----------------------------------------------------------------------
58 #----------------------------------------------------------------------
59 # Chooser functions
59 # Chooser functions
60 #----------------------------------------------------------------------
60 #----------------------------------------------------------------------
61
61
62 def plainrandom(loads):
62 def plainrandom(loads):
63 """Plain random pick."""
63 """Plain random pick."""
64 n = len(loads)
64 n = len(loads)
65 return randint(0,n-1)
65 return randint(0,n-1)
66
66
67 def lru(loads):
67 def lru(loads):
68 """Always pick the front of the line.
68 """Always pick the front of the line.
69
69
70 The content of `loads` is ignored.
70 The content of `loads` is ignored.
71
71
72 Assumes LRU ordering of loads, with oldest first.
72 Assumes LRU ordering of loads, with oldest first.
73 """
73 """
74 return 0
74 return 0
75
75
76 def twobin(loads):
76 def twobin(loads):
77 """Pick two at random, use the LRU of the two.
77 """Pick two at random, use the LRU of the two.
78
78
79 The content of loads is ignored.
79 The content of loads is ignored.
80
80
81 Assumes LRU ordering of loads, with oldest first.
81 Assumes LRU ordering of loads, with oldest first.
82 """
82 """
83 n = len(loads)
83 n = len(loads)
84 a = randint(0,n-1)
84 a = randint(0,n-1)
85 b = randint(0,n-1)
85 b = randint(0,n-1)
86 return min(a,b)
86 return min(a,b)
87
87
88 def weighted(loads):
88 def weighted(loads):
89 """Pick two at random using inverse load as weight.
89 """Pick two at random using inverse load as weight.
90
90
91 Return the less loaded of the two.
91 Return the less loaded of the two.
92 """
92 """
93 # weight 0 a million times more than 1:
93 # weight 0 a million times more than 1:
94 weights = 1./(1e-6+numpy.array(loads))
94 weights = 1./(1e-6+numpy.array(loads))
95 sums = weights.cumsum()
95 sums = weights.cumsum()
96 t = sums[-1]
96 t = sums[-1]
97 x = random()*t
97 x = random()*t
98 y = random()*t
98 y = random()*t
99 idx = 0
99 idx = 0
100 idy = 0
100 idy = 0
101 while sums[idx] < x:
101 while sums[idx] < x:
102 idx += 1
102 idx += 1
103 while sums[idy] < y:
103 while sums[idy] < y:
104 idy += 1
104 idy += 1
105 if weights[idy] > weights[idx]:
105 if weights[idy] > weights[idx]:
106 return idy
106 return idy
107 else:
107 else:
108 return idx
108 return idx
109
109
110 def leastload(loads):
110 def leastload(loads):
111 """Always choose the lowest load.
111 """Always choose the lowest load.
112
112
113 If the lowest load occurs more than once, the first
113 If the lowest load occurs more than once, the first
114 occurance will be used. If loads has LRU ordering, this means
114 occurance will be used. If loads has LRU ordering, this means
115 the LRU of those with the lowest load is chosen.
115 the LRU of those with the lowest load is chosen.
116 """
116 """
117 return loads.index(min(loads))
117 return loads.index(min(loads))
118
118
119 #---------------------------------------------------------------------
119 #---------------------------------------------------------------------
120 # Classes
120 # Classes
121 #---------------------------------------------------------------------
121 #---------------------------------------------------------------------
122 # store empty default dependency:
122 # store empty default dependency:
123 MET = Dependency([])
123 MET = Dependency([])
124
124
125 class TaskScheduler(SessionFactory):
125 class TaskScheduler(SessionFactory):
126 """Python TaskScheduler object.
126 """Python TaskScheduler object.
127
127
128 This is the simplest object that supports msg_id based
128 This is the simplest object that supports msg_id based
129 DAG dependencies. *Only* task msg_ids are checked, not
129 DAG dependencies. *Only* task msg_ids are checked, not
130 msg_ids of jobs submitted via the MUX queue.
130 msg_ids of jobs submitted via the MUX queue.
131
131
132 """
132 """
133
133
134 hwm = Integer(1, config=True,
134 hwm = Integer(1, config=True,
135 help="""specify the High Water Mark (HWM) for the downstream
135 help="""specify the High Water Mark (HWM) for the downstream
136 socket in the Task scheduler. This is the maximum number
136 socket in the Task scheduler. This is the maximum number
137 of allowed outstanding tasks on each engine.
137 of allowed outstanding tasks on each engine.
138
138
139 The default (1) means that only one task can be outstanding on each
139 The default (1) means that only one task can be outstanding on each
140 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
140 engine. Setting TaskScheduler.hwm=0 means there is no limit, and the
141 engines continue to be assigned tasks while they are working,
141 engines continue to be assigned tasks while they are working,
142 effectively hiding network latency behind computation, but can result
142 effectively hiding network latency behind computation, but can result
143 in an imbalance of work when submitting many heterogenous tasks all at
143 in an imbalance of work when submitting many heterogenous tasks all at
144 once. Any positive value greater than one is a compromise between the
144 once. Any positive value greater than one is a compromise between the
145 two.
145 two.
146
146
147 """
147 """
148 )
148 )
149 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
149 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
150 'leastload', config=True, allow_none=False,
150 'leastload', config=True, allow_none=False,
151 help="""select the task scheduler scheme [default: Python LRU]
151 help="""select the task scheduler scheme [default: Python LRU]
152 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
152 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
153 )
153 )
154 def _scheme_name_changed(self, old, new):
154 def _scheme_name_changed(self, old, new):
155 self.log.debug("Using scheme %r"%new)
155 self.log.debug("Using scheme %r"%new)
156 self.scheme = globals()[new]
156 self.scheme = globals()[new]
157
157
158 # input arguments:
158 # input arguments:
159 scheme = Instance(FunctionType) # function for determining the destination
159 scheme = Instance(FunctionType) # function for determining the destination
160 def _scheme_default(self):
160 def _scheme_default(self):
161 return leastload
161 return leastload
162 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
162 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
163 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
163 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
164 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
164 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
165 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
165 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
166
166
167 # internals:
167 # internals:
168 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
168 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
169 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
169 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
170 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
170 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
171 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
171 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
172 pending = Dict() # dict by engine_uuid of submitted tasks
172 pending = Dict() # dict by engine_uuid of submitted tasks
173 completed = Dict() # dict by engine_uuid of completed tasks
173 completed = Dict() # dict by engine_uuid of completed tasks
174 failed = Dict() # dict by engine_uuid of failed tasks
174 failed = Dict() # dict by engine_uuid of failed tasks
175 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
175 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
176 clients = Dict() # dict by msg_id for who submitted the task
176 clients = Dict() # dict by msg_id for who submitted the task
177 targets = List() # list of target IDENTs
177 targets = List() # list of target IDENTs
178 loads = List() # list of engine loads
178 loads = List() # list of engine loads
179 # full = Set() # set of IDENTs that have HWM outstanding tasks
179 # full = Set() # set of IDENTs that have HWM outstanding tasks
180 all_completed = Set() # set of all completed tasks
180 all_completed = Set() # set of all completed tasks
181 all_failed = Set() # set of all failed tasks
181 all_failed = Set() # set of all failed tasks
182 all_done = Set() # set of all finished tasks=union(completed,failed)
182 all_done = Set() # set of all finished tasks=union(completed,failed)
183 all_ids = Set() # set of all submitted task IDs
183 all_ids = Set() # set of all submitted task IDs
184 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
184 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
185 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
185 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
186
186
187 ident = CBytes() # ZMQ identity. This should just be self.session.session
187 ident = CBytes() # ZMQ identity. This should just be self.session.session
188 # but ensure Bytes
188 # but ensure Bytes
189 def _ident_default(self):
189 def _ident_default(self):
190 return self.session.bsession
190 return self.session.bsession
191
191
192 def start(self):
192 def start(self):
193 self.engine_stream.on_recv(self.dispatch_result, copy=False)
193 self.engine_stream.on_recv(self.dispatch_result, copy=False)
194 self.client_stream.on_recv(self.dispatch_submission, copy=False)
195
194 self._notification_handlers = dict(
196 self._notification_handlers = dict(
195 registration_notification = self._register_engine,
197 registration_notification = self._register_engine,
196 unregistration_notification = self._unregister_engine
198 unregistration_notification = self._unregister_engine
197 )
199 )
198 self.notifier_stream.on_recv(self.dispatch_notification)
200 self.notifier_stream.on_recv(self.dispatch_notification)
199 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
201 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
200 self.auditor.start()
202 self.auditor.start()
201 self.log.info("Scheduler started [%s]"%self.scheme_name)
203 self.log.info("Scheduler started [%s]"%self.scheme_name)
202
204
203 def resume_receiving(self):
205 def resume_receiving(self):
204 """Resume accepting jobs."""
206 """Resume accepting jobs."""
205 self.client_stream.on_recv(self.dispatch_submission, copy=False)
207 self.client_stream.on_recv(self.dispatch_submission, copy=False)
206
208
207 def stop_receiving(self):
209 def stop_receiving(self):
208 """Stop accepting jobs while there are no engines.
210 """Stop accepting jobs while there are no engines.
209 Leave them in the ZMQ queue."""
211 Leave them in the ZMQ queue."""
210 self.client_stream.on_recv(None)
212 self.client_stream.on_recv(None)
211
213
212 #-----------------------------------------------------------------------
214 #-----------------------------------------------------------------------
213 # [Un]Registration Handling
215 # [Un]Registration Handling
214 #-----------------------------------------------------------------------
216 #-----------------------------------------------------------------------
215
217
216 def dispatch_notification(self, msg):
218 def dispatch_notification(self, msg):
217 """dispatch register/unregister events."""
219 """dispatch register/unregister events."""
218 try:
220 try:
219 idents,msg = self.session.feed_identities(msg)
221 idents,msg = self.session.feed_identities(msg)
220 except ValueError:
222 except ValueError:
221 self.log.warn("task::Invalid Message: %r",msg)
223 self.log.warn("task::Invalid Message: %r",msg)
222 return
224 return
223 try:
225 try:
224 msg = self.session.unserialize(msg)
226 msg = self.session.unserialize(msg)
225 except ValueError:
227 except ValueError:
226 self.log.warn("task::Unauthorized message from: %r"%idents)
228 self.log.warn("task::Unauthorized message from: %r"%idents)
227 return
229 return
228
230
229 msg_type = msg['header']['msg_type']
231 msg_type = msg['header']['msg_type']
230
232
231 handler = self._notification_handlers.get(msg_type, None)
233 handler = self._notification_handlers.get(msg_type, None)
232 if handler is None:
234 if handler is None:
233 self.log.error("Unhandled message type: %r"%msg_type)
235 self.log.error("Unhandled message type: %r"%msg_type)
234 else:
236 else:
235 try:
237 try:
236 handler(asbytes(msg['content']['queue']))
238 handler(asbytes(msg['content']['queue']))
237 except Exception:
239 except Exception:
238 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
240 self.log.error("task::Invalid notification msg: %r", msg, exc_info=True)
239
241
240 def _register_engine(self, uid):
242 def _register_engine(self, uid):
241 """New engine with ident `uid` became available."""
243 """New engine with ident `uid` became available."""
242 # head of the line:
244 # head of the line:
243 self.targets.insert(0,uid)
245 self.targets.insert(0,uid)
244 self.loads.insert(0,0)
246 self.loads.insert(0,0)
245
247
246 # initialize sets
248 # initialize sets
247 self.completed[uid] = set()
249 self.completed[uid] = set()
248 self.failed[uid] = set()
250 self.failed[uid] = set()
249 self.pending[uid] = {}
251 self.pending[uid] = {}
250 if len(self.targets) == 1:
252
251 self.resume_receiving()
252 # rescan the graph:
253 # rescan the graph:
253 self.update_graph(None)
254 self.update_graph(None)
254
255
255 def _unregister_engine(self, uid):
256 def _unregister_engine(self, uid):
256 """Existing engine with ident `uid` became unavailable."""
257 """Existing engine with ident `uid` became unavailable."""
257 if len(self.targets) == 1:
258 if len(self.targets) == 1:
258 # this was our only engine
259 # this was our only engine
259 self.stop_receiving()
260 pass
260
261
261 # handle any potentially finished tasks:
262 # handle any potentially finished tasks:
262 self.engine_stream.flush()
263 self.engine_stream.flush()
263
264
264 # don't pop destinations, because they might be used later
265 # don't pop destinations, because they might be used later
265 # map(self.destinations.pop, self.completed.pop(uid))
266 # map(self.destinations.pop, self.completed.pop(uid))
266 # map(self.destinations.pop, self.failed.pop(uid))
267 # map(self.destinations.pop, self.failed.pop(uid))
267
268
268 # prevent this engine from receiving work
269 # prevent this engine from receiving work
269 idx = self.targets.index(uid)
270 idx = self.targets.index(uid)
270 self.targets.pop(idx)
271 self.targets.pop(idx)
271 self.loads.pop(idx)
272 self.loads.pop(idx)
272
273
273 # wait 5 seconds before cleaning up pending jobs, since the results might
274 # wait 5 seconds before cleaning up pending jobs, since the results might
274 # still be incoming
275 # still be incoming
275 if self.pending[uid]:
276 if self.pending[uid]:
276 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
277 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
277 dc.start()
278 dc.start()
278 else:
279 else:
279 self.completed.pop(uid)
280 self.completed.pop(uid)
280 self.failed.pop(uid)
281 self.failed.pop(uid)
281
282
282
283
283 def handle_stranded_tasks(self, engine):
284 def handle_stranded_tasks(self, engine):
284 """Deal with jobs resident in an engine that died."""
285 """Deal with jobs resident in an engine that died."""
285 lost = self.pending[engine]
286 lost = self.pending[engine]
286 for msg_id in lost.keys():
287 for msg_id in lost.keys():
287 if msg_id not in self.pending[engine]:
288 if msg_id not in self.pending[engine]:
288 # prevent double-handling of messages
289 # prevent double-handling of messages
289 continue
290 continue
290
291
291 raw_msg = lost[msg_id][0]
292 raw_msg = lost[msg_id][0]
292 idents,msg = self.session.feed_identities(raw_msg, copy=False)
293 idents,msg = self.session.feed_identities(raw_msg, copy=False)
293 parent = self.session.unpack(msg[1].bytes)
294 parent = self.session.unpack(msg[1].bytes)
294 idents = [engine, idents[0]]
295 idents = [engine, idents[0]]
295
296
296 # build fake error reply
297 # build fake error reply
297 try:
298 try:
298 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
299 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
299 except:
300 except:
300 content = error.wrap_exception()
301 content = error.wrap_exception()
301 # build fake header
302 # build fake header
302 header = dict(
303 header = dict(
303 status='error',
304 status='error',
304 engine=engine,
305 engine=engine,
305 date=datetime.now(),
306 date=datetime.now(),
306 )
307 )
307 msg = self.session.msg('apply_reply', content, parent=parent, subheader=header)
308 msg = self.session.msg('apply_reply', content, parent=parent, subheader=header)
308 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
309 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
309 # and dispatch it
310 # and dispatch it
310 self.dispatch_result(raw_reply)
311 self.dispatch_result(raw_reply)
311
312
312 # finally scrub completed/failed lists
313 # finally scrub completed/failed lists
313 self.completed.pop(engine)
314 self.completed.pop(engine)
314 self.failed.pop(engine)
315 self.failed.pop(engine)
315
316
316
317
317 #-----------------------------------------------------------------------
318 #-----------------------------------------------------------------------
318 # Job Submission
319 # Job Submission
319 #-----------------------------------------------------------------------
320 #-----------------------------------------------------------------------
320 def dispatch_submission(self, raw_msg):
321 def dispatch_submission(self, raw_msg):
321 """Dispatch job submission to appropriate handlers."""
322 """Dispatch job submission to appropriate handlers."""
322 # ensure targets up to date:
323 # ensure targets up to date:
323 self.notifier_stream.flush()
324 self.notifier_stream.flush()
324 try:
325 try:
325 idents, msg = self.session.feed_identities(raw_msg, copy=False)
326 idents, msg = self.session.feed_identities(raw_msg, copy=False)
326 msg = self.session.unserialize(msg, content=False, copy=False)
327 msg = self.session.unserialize(msg, content=False, copy=False)
327 except Exception:
328 except Exception:
328 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
329 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
329 return
330 return
330
331
331
332
332 # send to monitor
333 # send to monitor
333 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
334 self.mon_stream.send_multipart([b'intask']+raw_msg, copy=False)
334
335
335 header = msg['header']
336 header = msg['header']
336 msg_id = header['msg_id']
337 msg_id = header['msg_id']
337 self.all_ids.add(msg_id)
338 self.all_ids.add(msg_id)
338
339
339 # get targets as a set of bytes objects
340 # get targets as a set of bytes objects
340 # from a list of unicode objects
341 # from a list of unicode objects
341 targets = header.get('targets', [])
342 targets = header.get('targets', [])
342 targets = map(asbytes, targets)
343 targets = map(asbytes, targets)
343 targets = set(targets)
344 targets = set(targets)
344
345
345 retries = header.get('retries', 0)
346 retries = header.get('retries', 0)
346 self.retries[msg_id] = retries
347 self.retries[msg_id] = retries
347
348
348 # time dependencies
349 # time dependencies
349 after = header.get('after', None)
350 after = header.get('after', None)
350 if after:
351 if after:
351 after = Dependency(after)
352 after = Dependency(after)
352 if after.all:
353 if after.all:
353 if after.success:
354 if after.success:
354 after = Dependency(after.difference(self.all_completed),
355 after = Dependency(after.difference(self.all_completed),
355 success=after.success,
356 success=after.success,
356 failure=after.failure,
357 failure=after.failure,
357 all=after.all,
358 all=after.all,
358 )
359 )
359 if after.failure:
360 if after.failure:
360 after = Dependency(after.difference(self.all_failed),
361 after = Dependency(after.difference(self.all_failed),
361 success=after.success,
362 success=after.success,
362 failure=after.failure,
363 failure=after.failure,
363 all=after.all,
364 all=after.all,
364 )
365 )
365 if after.check(self.all_completed, self.all_failed):
366 if after.check(self.all_completed, self.all_failed):
366 # recast as empty set, if `after` already met,
367 # recast as empty set, if `after` already met,
367 # to prevent unnecessary set comparisons
368 # to prevent unnecessary set comparisons
368 after = MET
369 after = MET
369 else:
370 else:
370 after = MET
371 after = MET
371
372
372 # location dependencies
373 # location dependencies
373 follow = Dependency(header.get('follow', []))
374 follow = Dependency(header.get('follow', []))
374
375
375 # turn timeouts into datetime objects:
376 # turn timeouts into datetime objects:
376 timeout = header.get('timeout', None)
377 timeout = header.get('timeout', None)
377 if timeout:
378 if timeout:
378 # cast to float, because jsonlib returns floats as decimal.Decimal,
379 # cast to float, because jsonlib returns floats as decimal.Decimal,
379 # which timedelta does not accept
380 # which timedelta does not accept
380 timeout = datetime.now() + timedelta(0,float(timeout),0)
381 timeout = datetime.now() + timedelta(0,float(timeout),0)
381
382
382 args = [raw_msg, targets, after, follow, timeout]
383 args = [raw_msg, targets, after, follow, timeout]
383
384
384 # validate and reduce dependencies:
385 # validate and reduce dependencies:
385 for dep in after,follow:
386 for dep in after,follow:
386 if not dep: # empty dependency
387 if not dep: # empty dependency
387 continue
388 continue
388 # check valid:
389 # check valid:
389 if msg_id in dep or dep.difference(self.all_ids):
390 if msg_id in dep or dep.difference(self.all_ids):
390 self.depending[msg_id] = args
391 self.depending[msg_id] = args
391 return self.fail_unreachable(msg_id, error.InvalidDependency)
392 return self.fail_unreachable(msg_id, error.InvalidDependency)
392 # check if unreachable:
393 # check if unreachable:
393 if dep.unreachable(self.all_completed, self.all_failed):
394 if dep.unreachable(self.all_completed, self.all_failed):
394 self.depending[msg_id] = args
395 self.depending[msg_id] = args
395 return self.fail_unreachable(msg_id)
396 return self.fail_unreachable(msg_id)
396
397
397 if after.check(self.all_completed, self.all_failed):
398 if after.check(self.all_completed, self.all_failed):
398 # time deps already met, try to run
399 # time deps already met, try to run
399 if not self.maybe_run(msg_id, *args):
400 if not self.maybe_run(msg_id, *args):
400 # can't run yet
401 # can't run yet
401 if msg_id not in self.all_failed:
402 if msg_id not in self.all_failed:
402 # could have failed as unreachable
403 # could have failed as unreachable
403 self.save_unmet(msg_id, *args)
404 self.save_unmet(msg_id, *args)
404 else:
405 else:
405 self.save_unmet(msg_id, *args)
406 self.save_unmet(msg_id, *args)
406
407
407 def audit_timeouts(self):
408 def audit_timeouts(self):
408 """Audit all waiting tasks for expired timeouts."""
409 """Audit all waiting tasks for expired timeouts."""
409 now = datetime.now()
410 now = datetime.now()
410 for msg_id in self.depending.keys():
411 for msg_id in self.depending.keys():
411 # must recheck, in case one failure cascaded to another:
412 # must recheck, in case one failure cascaded to another:
412 if msg_id in self.depending:
413 if msg_id in self.depending:
413 raw,after,targets,follow,timeout = self.depending[msg_id]
414 raw,after,targets,follow,timeout = self.depending[msg_id]
414 if timeout and timeout < now:
415 if timeout and timeout < now:
415 self.fail_unreachable(msg_id, error.TaskTimeout)
416 self.fail_unreachable(msg_id, error.TaskTimeout)
416
417
417 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
418 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
418 """a task has become unreachable, send a reply with an ImpossibleDependency
419 """a task has become unreachable, send a reply with an ImpossibleDependency
419 error."""
420 error."""
420 if msg_id not in self.depending:
421 if msg_id not in self.depending:
421 self.log.error("msg %r already failed!", msg_id)
422 self.log.error("msg %r already failed!", msg_id)
422 return
423 return
423 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
424 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
424 for mid in follow.union(after):
425 for mid in follow.union(after):
425 if mid in self.graph:
426 if mid in self.graph:
426 self.graph[mid].remove(msg_id)
427 self.graph[mid].remove(msg_id)
427
428
428 # FIXME: unpacking a message I've already unpacked, but didn't save:
429 # FIXME: unpacking a message I've already unpacked, but didn't save:
429 idents,msg = self.session.feed_identities(raw_msg, copy=False)
430 idents,msg = self.session.feed_identities(raw_msg, copy=False)
430 header = self.session.unpack(msg[1].bytes)
431 header = self.session.unpack(msg[1].bytes)
431
432
432 try:
433 try:
433 raise why()
434 raise why()
434 except:
435 except:
435 content = error.wrap_exception()
436 content = error.wrap_exception()
436
437
437 self.all_done.add(msg_id)
438 self.all_done.add(msg_id)
438 self.all_failed.add(msg_id)
439 self.all_failed.add(msg_id)
439
440
440 msg = self.session.send(self.client_stream, 'apply_reply', content,
441 msg = self.session.send(self.client_stream, 'apply_reply', content,
441 parent=header, ident=idents)
442 parent=header, ident=idents)
442 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
443 self.session.send(self.mon_stream, msg, ident=[b'outtask']+idents)
443
444
444 self.update_graph(msg_id, success=False)
445 self.update_graph(msg_id, success=False)
445
446
446 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
447 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
447 """check location dependencies, and run if they are met."""
448 """check location dependencies, and run if they are met."""
449 self.log.debug("Attempting to assign task %s", msg_id)
450 if not self.targets:
451 # no engines, definitely can't run
452 return False
453
448 blacklist = self.blacklist.setdefault(msg_id, set())
454 blacklist = self.blacklist.setdefault(msg_id, set())
449 if follow or targets or blacklist or self.hwm:
455 if follow or targets or blacklist or self.hwm:
450 # we need a can_run filter
456 # we need a can_run filter
451 def can_run(idx):
457 def can_run(idx):
452 # check hwm
458 # check hwm
453 if self.hwm and self.loads[idx] == self.hwm:
459 if self.hwm and self.loads[idx] == self.hwm:
454 return False
460 return False
455 target = self.targets[idx]
461 target = self.targets[idx]
456 # check blacklist
462 # check blacklist
457 if target in blacklist:
463 if target in blacklist:
458 return False
464 return False
459 # check targets
465 # check targets
460 if targets and target not in targets:
466 if targets and target not in targets:
461 return False
467 return False
462 # check follow
468 # check follow
463 return follow.check(self.completed[target], self.failed[target])
469 return follow.check(self.completed[target], self.failed[target])
464
470
465 indices = filter(can_run, range(len(self.targets)))
471 indices = filter(can_run, range(len(self.targets)))
466
472
467 if not indices:
473 if not indices:
468 # couldn't run
474 # couldn't run
469 if follow.all:
475 if follow.all:
470 # check follow for impossibility
476 # check follow for impossibility
471 dests = set()
477 dests = set()
472 relevant = set()
478 relevant = set()
473 if follow.success:
479 if follow.success:
474 relevant = self.all_completed
480 relevant = self.all_completed
475 if follow.failure:
481 if follow.failure:
476 relevant = relevant.union(self.all_failed)
482 relevant = relevant.union(self.all_failed)
477 for m in follow.intersection(relevant):
483 for m in follow.intersection(relevant):
478 dests.add(self.destinations[m])
484 dests.add(self.destinations[m])
479 if len(dests) > 1:
485 if len(dests) > 1:
480 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
486 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
481 self.fail_unreachable(msg_id)
487 self.fail_unreachable(msg_id)
482 return False
488 return False
483 if targets:
489 if targets:
484 # check blacklist+targets for impossibility
490 # check blacklist+targets for impossibility
485 targets.difference_update(blacklist)
491 targets.difference_update(blacklist)
486 if not targets or not targets.intersection(self.targets):
492 if not targets or not targets.intersection(self.targets):
487 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
493 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
488 self.fail_unreachable(msg_id)
494 self.fail_unreachable(msg_id)
489 return False
495 return False
490 return False
496 return False
491 else:
497 else:
492 indices = None
498 indices = None
493
499
494 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
500 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
495 return True
501 return True
496
502
497 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
503 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
498 """Save a message for later submission when its dependencies are met."""
504 """Save a message for later submission when its dependencies are met."""
499 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
505 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
500 # track the ids in follow or after, but not those already finished
506 # track the ids in follow or after, but not those already finished
501 for dep_id in after.union(follow).difference(self.all_done):
507 for dep_id in after.union(follow).difference(self.all_done):
502 if dep_id not in self.graph:
508 if dep_id not in self.graph:
503 self.graph[dep_id] = set()
509 self.graph[dep_id] = set()
504 self.graph[dep_id].add(msg_id)
510 self.graph[dep_id].add(msg_id)
505
511
506 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
512 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
507 """Submit a task to any of a subset of our targets."""
513 """Submit a task to any of a subset of our targets."""
508 if indices:
514 if indices:
509 loads = [self.loads[i] for i in indices]
515 loads = [self.loads[i] for i in indices]
510 else:
516 else:
511 loads = self.loads
517 loads = self.loads
512 idx = self.scheme(loads)
518 idx = self.scheme(loads)
513 if indices:
519 if indices:
514 idx = indices[idx]
520 idx = indices[idx]
515 target = self.targets[idx]
521 target = self.targets[idx]
516 # print (target, map(str, msg[:3]))
522 # print (target, map(str, msg[:3]))
517 # send job to the engine
523 # send job to the engine
518 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
524 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
519 self.engine_stream.send_multipart(raw_msg, copy=False)
525 self.engine_stream.send_multipart(raw_msg, copy=False)
520 # update load
526 # update load
521 self.add_job(idx)
527 self.add_job(idx)
522 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
528 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
523 # notify Hub
529 # notify Hub
524 content = dict(msg_id=msg_id, engine_id=target.decode('ascii'))
530 content = dict(msg_id=msg_id, engine_id=target.decode('ascii'))
525 self.session.send(self.mon_stream, 'task_destination', content=content,
531 self.session.send(self.mon_stream, 'task_destination', content=content,
526 ident=[b'tracktask',self.ident])
532 ident=[b'tracktask',self.ident])
527
533
528
534
529 #-----------------------------------------------------------------------
535 #-----------------------------------------------------------------------
530 # Result Handling
536 # Result Handling
531 #-----------------------------------------------------------------------
537 #-----------------------------------------------------------------------
532 def dispatch_result(self, raw_msg):
538 def dispatch_result(self, raw_msg):
533 """dispatch method for result replies"""
539 """dispatch method for result replies"""
534 try:
540 try:
535 idents,msg = self.session.feed_identities(raw_msg, copy=False)
541 idents,msg = self.session.feed_identities(raw_msg, copy=False)
536 msg = self.session.unserialize(msg, content=False, copy=False)
542 msg = self.session.unserialize(msg, content=False, copy=False)
537 engine = idents[0]
543 engine = idents[0]
538 try:
544 try:
539 idx = self.targets.index(engine)
545 idx = self.targets.index(engine)
540 except ValueError:
546 except ValueError:
541 pass # skip load-update for dead engines
547 pass # skip load-update for dead engines
542 else:
548 else:
543 self.finish_job(idx)
549 self.finish_job(idx)
544 except Exception:
550 except Exception:
545 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
551 self.log.error("task::Invaid result: %r", raw_msg, exc_info=True)
546 return
552 return
547
553
548 header = msg['header']
554 header = msg['header']
549 parent = msg['parent_header']
555 parent = msg['parent_header']
550 if header.get('dependencies_met', True):
556 if header.get('dependencies_met', True):
551 success = (header['status'] == 'ok')
557 success = (header['status'] == 'ok')
552 msg_id = parent['msg_id']
558 msg_id = parent['msg_id']
553 retries = self.retries[msg_id]
559 retries = self.retries[msg_id]
554 if not success and retries > 0:
560 if not success and retries > 0:
555 # failed
561 # failed
556 self.retries[msg_id] = retries - 1
562 self.retries[msg_id] = retries - 1
557 self.handle_unmet_dependency(idents, parent)
563 self.handle_unmet_dependency(idents, parent)
558 else:
564 else:
559 del self.retries[msg_id]
565 del self.retries[msg_id]
560 # relay to client and update graph
566 # relay to client and update graph
561 self.handle_result(idents, parent, raw_msg, success)
567 self.handle_result(idents, parent, raw_msg, success)
562 # send to Hub monitor
568 # send to Hub monitor
563 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
569 self.mon_stream.send_multipart([b'outtask']+raw_msg, copy=False)
564 else:
570 else:
565 self.handle_unmet_dependency(idents, parent)
571 self.handle_unmet_dependency(idents, parent)
566
572
567 def handle_result(self, idents, parent, raw_msg, success=True):
573 def handle_result(self, idents, parent, raw_msg, success=True):
568 """handle a real task result, either success or failure"""
574 """handle a real task result, either success or failure"""
569 # first, relay result to client
575 # first, relay result to client
570 engine = idents[0]
576 engine = idents[0]
571 client = idents[1]
577 client = idents[1]
572 # swap_ids for XREP-XREP mirror
578 # swap_ids for XREP-XREP mirror
573 raw_msg[:2] = [client,engine]
579 raw_msg[:2] = [client,engine]
574 # print (map(str, raw_msg[:4]))
580 # print (map(str, raw_msg[:4]))
575 self.client_stream.send_multipart(raw_msg, copy=False)
581 self.client_stream.send_multipart(raw_msg, copy=False)
576 # now, update our data structures
582 # now, update our data structures
577 msg_id = parent['msg_id']
583 msg_id = parent['msg_id']
578 self.blacklist.pop(msg_id, None)
584 self.blacklist.pop(msg_id, None)
579 self.pending[engine].pop(msg_id)
585 self.pending[engine].pop(msg_id)
580 if success:
586 if success:
581 self.completed[engine].add(msg_id)
587 self.completed[engine].add(msg_id)
582 self.all_completed.add(msg_id)
588 self.all_completed.add(msg_id)
583 else:
589 else:
584 self.failed[engine].add(msg_id)
590 self.failed[engine].add(msg_id)
585 self.all_failed.add(msg_id)
591 self.all_failed.add(msg_id)
586 self.all_done.add(msg_id)
592 self.all_done.add(msg_id)
587 self.destinations[msg_id] = engine
593 self.destinations[msg_id] = engine
588
594
589 self.update_graph(msg_id, success)
595 self.update_graph(msg_id, success)
590
596
591 def handle_unmet_dependency(self, idents, parent):
597 def handle_unmet_dependency(self, idents, parent):
592 """handle an unmet dependency"""
598 """handle an unmet dependency"""
593 engine = idents[0]
599 engine = idents[0]
594 msg_id = parent['msg_id']
600 msg_id = parent['msg_id']
595
601
596 if msg_id not in self.blacklist:
602 if msg_id not in self.blacklist:
597 self.blacklist[msg_id] = set()
603 self.blacklist[msg_id] = set()
598 self.blacklist[msg_id].add(engine)
604 self.blacklist[msg_id].add(engine)
599
605
600 args = self.pending[engine].pop(msg_id)
606 args = self.pending[engine].pop(msg_id)
601 raw,targets,after,follow,timeout = args
607 raw,targets,after,follow,timeout = args
602
608
603 if self.blacklist[msg_id] == targets:
609 if self.blacklist[msg_id] == targets:
604 self.depending[msg_id] = args
610 self.depending[msg_id] = args
605 self.fail_unreachable(msg_id)
611 self.fail_unreachable(msg_id)
606 elif not self.maybe_run(msg_id, *args):
612 elif not self.maybe_run(msg_id, *args):
607 # resubmit failed
613 # resubmit failed
608 if msg_id not in self.all_failed:
614 if msg_id not in self.all_failed:
609 # put it back in our dependency tree
615 # put it back in our dependency tree
610 self.save_unmet(msg_id, *args)
616 self.save_unmet(msg_id, *args)
611
617
612 if self.hwm:
618 if self.hwm:
613 try:
619 try:
614 idx = self.targets.index(engine)
620 idx = self.targets.index(engine)
615 except ValueError:
621 except ValueError:
616 pass # skip load-update for dead engines
622 pass # skip load-update for dead engines
617 else:
623 else:
618 if self.loads[idx] == self.hwm-1:
624 if self.loads[idx] == self.hwm-1:
619 self.update_graph(None)
625 self.update_graph(None)
620
626
621
627
622
628
623 def update_graph(self, dep_id=None, success=True):
629 def update_graph(self, dep_id=None, success=True):
624 """dep_id just finished. Update our dependency
630 """dep_id just finished. Update our dependency
625 graph and submit any jobs that just became runable.
631 graph and submit any jobs that just became runable.
626
632
627 Called with dep_id=None to update entire graph for hwm, but without finishing
633 Called with dep_id=None to update entire graph for hwm, but without finishing
628 a task.
634 a task.
629 """
635 """
630 # print ("\n\n***********")
636 # print ("\n\n***********")
631 # pprint (dep_id)
637 # pprint (dep_id)
632 # pprint (self.graph)
638 # pprint (self.graph)
633 # pprint (self.depending)
639 # pprint (self.depending)
634 # pprint (self.all_completed)
640 # pprint (self.all_completed)
635 # pprint (self.all_failed)
641 # pprint (self.all_failed)
636 # print ("\n\n***********\n\n")
642 # print ("\n\n***********\n\n")
637 # update any jobs that depended on the dependency
643 # update any jobs that depended on the dependency
638 jobs = self.graph.pop(dep_id, [])
644 jobs = self.graph.pop(dep_id, [])
639
645
640 # recheck *all* jobs if
646 # recheck *all* jobs if
641 # a) we have HWM and an engine just become no longer full
647 # a) we have HWM and an engine just become no longer full
642 # or b) dep_id was given as None
648 # or b) dep_id was given as None
643 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
649 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
644 jobs = self.depending.keys()
650 jobs = self.depending.keys()
645
651
646 for msg_id in jobs:
652 for msg_id in jobs:
647 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
653 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
648
654
649 if after.unreachable(self.all_completed, self.all_failed)\
655 if after.unreachable(self.all_completed, self.all_failed)\
650 or follow.unreachable(self.all_completed, self.all_failed):
656 or follow.unreachable(self.all_completed, self.all_failed):
651 self.fail_unreachable(msg_id)
657 self.fail_unreachable(msg_id)
652
658
653 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
659 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
654 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
660 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
655
661
656 self.depending.pop(msg_id)
662 self.depending.pop(msg_id)
657 for mid in follow.union(after):
663 for mid in follow.union(after):
658 if mid in self.graph:
664 if mid in self.graph:
659 self.graph[mid].remove(msg_id)
665 self.graph[mid].remove(msg_id)
660
666
661 #----------------------------------------------------------------------
667 #----------------------------------------------------------------------
662 # methods to be overridden by subclasses
668 # methods to be overridden by subclasses
663 #----------------------------------------------------------------------
669 #----------------------------------------------------------------------
664
670
665 def add_job(self, idx):
671 def add_job(self, idx):
666 """Called after self.targets[idx] just got the job with header.
672 """Called after self.targets[idx] just got the job with header.
667 Override with subclasses. The default ordering is simple LRU.
673 Override with subclasses. The default ordering is simple LRU.
668 The default loads are the number of outstanding jobs."""
674 The default loads are the number of outstanding jobs."""
669 self.loads[idx] += 1
675 self.loads[idx] += 1
670 for lis in (self.targets, self.loads):
676 for lis in (self.targets, self.loads):
671 lis.append(lis.pop(idx))
677 lis.append(lis.pop(idx))
672
678
673
679
674 def finish_job(self, idx):
680 def finish_job(self, idx):
675 """Called after self.targets[idx] just finished a job.
681 """Called after self.targets[idx] just finished a job.
676 Override with subclasses."""
682 Override with subclasses."""
677 self.loads[idx] -= 1
683 self.loads[idx] -= 1
678
684
679
685
680
686
681 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
687 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
682 logname='root', log_url=None, loglevel=logging.DEBUG,
688 logname='root', log_url=None, loglevel=logging.DEBUG,
683 identity=b'task', in_thread=False):
689 identity=b'task', in_thread=False):
684
690
685 ZMQStream = zmqstream.ZMQStream
691 ZMQStream = zmqstream.ZMQStream
686
692
687 if config:
693 if config:
688 # unwrap dict back into Config
694 # unwrap dict back into Config
689 config = Config(config)
695 config = Config(config)
690
696
691 if in_thread:
697 if in_thread:
692 # use instance() to get the same Context/Loop as our parent
698 # use instance() to get the same Context/Loop as our parent
693 ctx = zmq.Context.instance()
699 ctx = zmq.Context.instance()
694 loop = ioloop.IOLoop.instance()
700 loop = ioloop.IOLoop.instance()
695 else:
701 else:
696 # in a process, don't use instance()
702 # in a process, don't use instance()
697 # for safety with multiprocessing
703 # for safety with multiprocessing
698 ctx = zmq.Context()
704 ctx = zmq.Context()
699 loop = ioloop.IOLoop()
705 loop = ioloop.IOLoop()
700 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
706 ins = ZMQStream(ctx.socket(zmq.ROUTER),loop)
701 ins.setsockopt(zmq.IDENTITY, identity)
707 ins.setsockopt(zmq.IDENTITY, identity)
702 ins.bind(in_addr)
708 ins.bind(in_addr)
703
709
704 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
710 outs = ZMQStream(ctx.socket(zmq.ROUTER),loop)
705 outs.setsockopt(zmq.IDENTITY, identity)
711 outs.setsockopt(zmq.IDENTITY, identity)
706 outs.bind(out_addr)
712 outs.bind(out_addr)
707 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
713 mons = zmqstream.ZMQStream(ctx.socket(zmq.PUB),loop)
708 mons.connect(mon_addr)
714 mons.connect(mon_addr)
709 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
715 nots = zmqstream.ZMQStream(ctx.socket(zmq.SUB),loop)
710 nots.setsockopt(zmq.SUBSCRIBE, b'')
716 nots.setsockopt(zmq.SUBSCRIBE, b'')
711 nots.connect(not_addr)
717 nots.connect(not_addr)
712
718
713 # setup logging.
719 # setup logging.
714 if in_thread:
720 if in_thread:
715 log = Application.instance().log
721 log = Application.instance().log
716 else:
722 else:
717 if log_url:
723 if log_url:
718 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
724 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
719 else:
725 else:
720 log = local_logger(logname, loglevel)
726 log = local_logger(logname, loglevel)
721
727
722 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
728 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
723 mon_stream=mons, notifier_stream=nots,
729 mon_stream=mons, notifier_stream=nots,
724 loop=loop, log=log,
730 loop=loop, log=log,
725 config=config)
731 config=config)
726 scheduler.start()
732 scheduler.start()
727 if not in_thread:
733 if not in_thread:
728 try:
734 try:
729 loop.start()
735 loop.start()
730 except KeyboardInterrupt:
736 except KeyboardInterrupt:
731 scheduler.log.critical("Interrupted, exiting...")
737 scheduler.log.critical("Interrupted, exiting...")
732
738
@@ -1,316 +1,324 b''
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23 from tempfile import mktemp
23 from tempfile import mktemp
24
24
25 import zmq
25 import zmq
26
26
27 from IPython.parallel.client import client as clientmod
27 from IPython.parallel.client import client as clientmod
28 from IPython.parallel import error
28 from IPython.parallel import error
29 from IPython.parallel import AsyncResult, AsyncHubResult
29 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import LoadBalancedView, DirectView
30 from IPython.parallel import LoadBalancedView, DirectView
31
31
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
33
33
34 def setup():
34 def setup():
35 add_engines(4)
35 add_engines(4)
36
36
37 class TestClient(ClusterTestCase):
37 class TestClient(ClusterTestCase):
38
38
39 def test_ids(self):
39 def test_ids(self):
40 n = len(self.client.ids)
40 n = len(self.client.ids)
41 self.add_engines(3)
41 self.add_engines(3)
42 self.assertEquals(len(self.client.ids), n+3)
42 self.assertEquals(len(self.client.ids), n+3)
43
43
44 def test_view_indexing(self):
44 def test_view_indexing(self):
45 """test index access for views"""
45 """test index access for views"""
46 self.add_engines(2)
46 self.add_engines(2)
47 targets = self.client._build_targets('all')[-1]
47 targets = self.client._build_targets('all')[-1]
48 v = self.client[:]
48 v = self.client[:]
49 self.assertEquals(v.targets, targets)
49 self.assertEquals(v.targets, targets)
50 t = self.client.ids[2]
50 t = self.client.ids[2]
51 v = self.client[t]
51 v = self.client[t]
52 self.assert_(isinstance(v, DirectView))
52 self.assert_(isinstance(v, DirectView))
53 self.assertEquals(v.targets, t)
53 self.assertEquals(v.targets, t)
54 t = self.client.ids[2:4]
54 t = self.client.ids[2:4]
55 v = self.client[t]
55 v = self.client[t]
56 self.assert_(isinstance(v, DirectView))
56 self.assert_(isinstance(v, DirectView))
57 self.assertEquals(v.targets, t)
57 self.assertEquals(v.targets, t)
58 v = self.client[::2]
58 v = self.client[::2]
59 self.assert_(isinstance(v, DirectView))
59 self.assert_(isinstance(v, DirectView))
60 self.assertEquals(v.targets, targets[::2])
60 self.assertEquals(v.targets, targets[::2])
61 v = self.client[1::3]
61 v = self.client[1::3]
62 self.assert_(isinstance(v, DirectView))
62 self.assert_(isinstance(v, DirectView))
63 self.assertEquals(v.targets, targets[1::3])
63 self.assertEquals(v.targets, targets[1::3])
64 v = self.client[:-3]
64 v = self.client[:-3]
65 self.assert_(isinstance(v, DirectView))
65 self.assert_(isinstance(v, DirectView))
66 self.assertEquals(v.targets, targets[:-3])
66 self.assertEquals(v.targets, targets[:-3])
67 v = self.client[-1]
67 v = self.client[-1]
68 self.assert_(isinstance(v, DirectView))
68 self.assert_(isinstance(v, DirectView))
69 self.assertEquals(v.targets, targets[-1])
69 self.assertEquals(v.targets, targets[-1])
70 self.assertRaises(TypeError, lambda : self.client[None])
70 self.assertRaises(TypeError, lambda : self.client[None])
71
71
72 def test_lbview_targets(self):
72 def test_lbview_targets(self):
73 """test load_balanced_view targets"""
73 """test load_balanced_view targets"""
74 v = self.client.load_balanced_view()
74 v = self.client.load_balanced_view()
75 self.assertEquals(v.targets, None)
75 self.assertEquals(v.targets, None)
76 v = self.client.load_balanced_view(-1)
76 v = self.client.load_balanced_view(-1)
77 self.assertEquals(v.targets, [self.client.ids[-1]])
77 self.assertEquals(v.targets, [self.client.ids[-1]])
78 v = self.client.load_balanced_view('all')
78 v = self.client.load_balanced_view('all')
79 self.assertEquals(v.targets, None)
79 self.assertEquals(v.targets, None)
80
80
81 def test_dview_targets(self):
81 def test_dview_targets(self):
82 """test direct_view targets"""
82 """test direct_view targets"""
83 v = self.client.direct_view()
83 v = self.client.direct_view()
84 self.assertEquals(v.targets, 'all')
84 self.assertEquals(v.targets, 'all')
85 v = self.client.direct_view('all')
85 v = self.client.direct_view('all')
86 self.assertEquals(v.targets, 'all')
86 self.assertEquals(v.targets, 'all')
87 v = self.client.direct_view(-1)
87 v = self.client.direct_view(-1)
88 self.assertEquals(v.targets, self.client.ids[-1])
88 self.assertEquals(v.targets, self.client.ids[-1])
89
89
90 def test_lazy_all_targets(self):
90 def test_lazy_all_targets(self):
91 """test lazy evaluation of rc.direct_view('all')"""
91 """test lazy evaluation of rc.direct_view('all')"""
92 v = self.client.direct_view()
92 v = self.client.direct_view()
93 self.assertEquals(v.targets, 'all')
93 self.assertEquals(v.targets, 'all')
94
94
95 def double(x):
95 def double(x):
96 return x*2
96 return x*2
97 seq = range(100)
97 seq = range(100)
98 ref = [ double(x) for x in seq ]
98 ref = [ double(x) for x in seq ]
99
99
100 # add some engines, which should be used
100 # add some engines, which should be used
101 self.add_engines(2)
101 self.add_engines(2)
102 n1 = len(self.client.ids)
102 n1 = len(self.client.ids)
103
103
104 # simple apply
104 # simple apply
105 r = v.apply_sync(lambda : 1)
105 r = v.apply_sync(lambda : 1)
106 self.assertEquals(r, [1] * n1)
106 self.assertEquals(r, [1] * n1)
107
107
108 # map goes through remotefunction
108 # map goes through remotefunction
109 r = v.map_sync(double, seq)
109 r = v.map_sync(double, seq)
110 self.assertEquals(r, ref)
110 self.assertEquals(r, ref)
111
111
112 # add a couple more engines, and try again
112 # add a couple more engines, and try again
113 self.add_engines(2)
113 self.add_engines(2)
114 n2 = len(self.client.ids)
114 n2 = len(self.client.ids)
115 self.assertNotEquals(n2, n1)
115 self.assertNotEquals(n2, n1)
116
116
117 # apply
117 # apply
118 r = v.apply_sync(lambda : 1)
118 r = v.apply_sync(lambda : 1)
119 self.assertEquals(r, [1] * n2)
119 self.assertEquals(r, [1] * n2)
120
120
121 # map
121 # map
122 r = v.map_sync(double, seq)
122 r = v.map_sync(double, seq)
123 self.assertEquals(r, ref)
123 self.assertEquals(r, ref)
124
124
125 def test_targets(self):
125 def test_targets(self):
126 """test various valid targets arguments"""
126 """test various valid targets arguments"""
127 build = self.client._build_targets
127 build = self.client._build_targets
128 ids = self.client.ids
128 ids = self.client.ids
129 idents,targets = build(None)
129 idents,targets = build(None)
130 self.assertEquals(ids, targets)
130 self.assertEquals(ids, targets)
131
131
132 def test_clear(self):
132 def test_clear(self):
133 """test clear behavior"""
133 """test clear behavior"""
134 # self.add_engines(2)
134 # self.add_engines(2)
135 v = self.client[:]
135 v = self.client[:]
136 v.block=True
136 v.block=True
137 v.push(dict(a=5))
137 v.push(dict(a=5))
138 v.pull('a')
138 v.pull('a')
139 id0 = self.client.ids[-1]
139 id0 = self.client.ids[-1]
140 self.client.clear(targets=id0, block=True)
140 self.client.clear(targets=id0, block=True)
141 a = self.client[:-1].get('a')
141 a = self.client[:-1].get('a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.client.clear(block=True)
143 self.client.clear(block=True)
144 for i in self.client.ids:
144 for i in self.client.ids:
145 # print i
145 # print i
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147
147
148 def test_get_result(self):
148 def test_get_result(self):
149 """test getting results from the Hub."""
149 """test getting results from the Hub."""
150 c = clientmod.Client(profile='iptest')
150 c = clientmod.Client(profile='iptest')
151 # self.add_engines(1)
151 # self.add_engines(1)
152 t = c.ids[-1]
152 t = c.ids[-1]
153 ar = c[t].apply_async(wait, 1)
153 ar = c[t].apply_async(wait, 1)
154 # give the monitor time to notice the message
154 # give the monitor time to notice the message
155 time.sleep(.25)
155 time.sleep(.25)
156 ahr = self.client.get_result(ar.msg_ids)
156 ahr = self.client.get_result(ar.msg_ids)
157 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 self.assertTrue(isinstance(ahr, AsyncHubResult))
158 self.assertEquals(ahr.get(), ar.get())
158 self.assertEquals(ahr.get(), ar.get())
159 ar2 = self.client.get_result(ar.msg_ids)
159 ar2 = self.client.get_result(ar.msg_ids)
160 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 self.assertFalse(isinstance(ar2, AsyncHubResult))
161 c.close()
161 c.close()
162
162
163 def test_ids_list(self):
163 def test_ids_list(self):
164 """test client.ids"""
164 """test client.ids"""
165 # self.add_engines(2)
165 # self.add_engines(2)
166 ids = self.client.ids
166 ids = self.client.ids
167 self.assertEquals(ids, self.client._ids)
167 self.assertEquals(ids, self.client._ids)
168 self.assertFalse(ids is self.client._ids)
168 self.assertFalse(ids is self.client._ids)
169 ids.remove(ids[-1])
169 ids.remove(ids[-1])
170 self.assertNotEquals(ids, self.client._ids)
170 self.assertNotEquals(ids, self.client._ids)
171
171
172 def test_queue_status(self):
172 def test_queue_status(self):
173 # self.addEngine(4)
173 # self.addEngine(4)
174 ids = self.client.ids
174 ids = self.client.ids
175 id0 = ids[0]
175 id0 = ids[0]
176 qs = self.client.queue_status(targets=id0)
176 qs = self.client.queue_status(targets=id0)
177 self.assertTrue(isinstance(qs, dict))
177 self.assertTrue(isinstance(qs, dict))
178 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
178 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
179 allqs = self.client.queue_status()
179 allqs = self.client.queue_status()
180 self.assertTrue(isinstance(allqs, dict))
180 self.assertTrue(isinstance(allqs, dict))
181 intkeys = list(allqs.keys())
181 intkeys = list(allqs.keys())
182 intkeys.remove('unassigned')
182 intkeys.remove('unassigned')
183 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
183 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
184 unassigned = allqs.pop('unassigned')
184 unassigned = allqs.pop('unassigned')
185 for eid,qs in allqs.items():
185 for eid,qs in allqs.items():
186 self.assertTrue(isinstance(qs, dict))
186 self.assertTrue(isinstance(qs, dict))
187 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
187 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
188
188
189 def test_shutdown(self):
189 def test_shutdown(self):
190 # self.addEngine(4)
190 # self.addEngine(4)
191 ids = self.client.ids
191 ids = self.client.ids
192 id0 = ids[0]
192 id0 = ids[0]
193 self.client.shutdown(id0, block=True)
193 self.client.shutdown(id0, block=True)
194 while id0 in self.client.ids:
194 while id0 in self.client.ids:
195 time.sleep(0.1)
195 time.sleep(0.1)
196 self.client.spin()
196 self.client.spin()
197
197
198 self.assertRaises(IndexError, lambda : self.client[id0])
198 self.assertRaises(IndexError, lambda : self.client[id0])
199
199
200 def test_result_status(self):
200 def test_result_status(self):
201 pass
201 pass
202 # to be written
202 # to be written
203
203
204 def test_db_query_dt(self):
204 def test_db_query_dt(self):
205 """test db query by date"""
205 """test db query by date"""
206 hist = self.client.hub_history()
206 hist = self.client.hub_history()
207 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
207 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
208 tic = middle['submitted']
208 tic = middle['submitted']
209 before = self.client.db_query({'submitted' : {'$lt' : tic}})
209 before = self.client.db_query({'submitted' : {'$lt' : tic}})
210 after = self.client.db_query({'submitted' : {'$gte' : tic}})
210 after = self.client.db_query({'submitted' : {'$gte' : tic}})
211 self.assertEquals(len(before)+len(after),len(hist))
211 self.assertEquals(len(before)+len(after),len(hist))
212 for b in before:
212 for b in before:
213 self.assertTrue(b['submitted'] < tic)
213 self.assertTrue(b['submitted'] < tic)
214 for a in after:
214 for a in after:
215 self.assertTrue(a['submitted'] >= tic)
215 self.assertTrue(a['submitted'] >= tic)
216 same = self.client.db_query({'submitted' : tic})
216 same = self.client.db_query({'submitted' : tic})
217 for s in same:
217 for s in same:
218 self.assertTrue(s['submitted'] == tic)
218 self.assertTrue(s['submitted'] == tic)
219
219
220 def test_db_query_keys(self):
220 def test_db_query_keys(self):
221 """test extracting subset of record keys"""
221 """test extracting subset of record keys"""
222 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
222 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
223 for rec in found:
223 for rec in found:
224 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
224 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
225
225
226 def test_db_query_default_keys(self):
227 """default db_query excludes buffers"""
228 found = self.client.db_query({'msg_id': {'$ne' : ''}})
229 for rec in found:
230 keys = set(rec.keys())
231 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
232 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
233
226 def test_db_query_msg_id(self):
234 def test_db_query_msg_id(self):
227 """ensure msg_id is always in db queries"""
235 """ensure msg_id is always in db queries"""
228 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
236 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
229 for rec in found:
237 for rec in found:
230 self.assertTrue('msg_id' in rec.keys())
238 self.assertTrue('msg_id' in rec.keys())
231 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
239 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
232 for rec in found:
240 for rec in found:
233 self.assertTrue('msg_id' in rec.keys())
241 self.assertTrue('msg_id' in rec.keys())
234 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
242 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
235 for rec in found:
243 for rec in found:
236 self.assertTrue('msg_id' in rec.keys())
244 self.assertTrue('msg_id' in rec.keys())
237
245
238 def test_db_query_in(self):
246 def test_db_query_in(self):
239 """test db query with '$in','$nin' operators"""
247 """test db query with '$in','$nin' operators"""
240 hist = self.client.hub_history()
248 hist = self.client.hub_history()
241 even = hist[::2]
249 even = hist[::2]
242 odd = hist[1::2]
250 odd = hist[1::2]
243 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
251 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
244 found = [ r['msg_id'] for r in recs ]
252 found = [ r['msg_id'] for r in recs ]
245 self.assertEquals(set(even), set(found))
253 self.assertEquals(set(even), set(found))
246 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
254 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
247 found = [ r['msg_id'] for r in recs ]
255 found = [ r['msg_id'] for r in recs ]
248 self.assertEquals(set(odd), set(found))
256 self.assertEquals(set(odd), set(found))
249
257
250 def test_hub_history(self):
258 def test_hub_history(self):
251 hist = self.client.hub_history()
259 hist = self.client.hub_history()
252 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
260 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
253 recdict = {}
261 recdict = {}
254 for rec in recs:
262 for rec in recs:
255 recdict[rec['msg_id']] = rec
263 recdict[rec['msg_id']] = rec
256
264
257 latest = datetime(1984,1,1)
265 latest = datetime(1984,1,1)
258 for msg_id in hist:
266 for msg_id in hist:
259 rec = recdict[msg_id]
267 rec = recdict[msg_id]
260 newt = rec['submitted']
268 newt = rec['submitted']
261 self.assertTrue(newt >= latest)
269 self.assertTrue(newt >= latest)
262 latest = newt
270 latest = newt
263 ar = self.client[-1].apply_async(lambda : 1)
271 ar = self.client[-1].apply_async(lambda : 1)
264 ar.get()
272 ar.get()
265 time.sleep(0.25)
273 time.sleep(0.25)
266 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
274 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
267
275
268 def test_resubmit(self):
276 def test_resubmit(self):
269 def f():
277 def f():
270 import random
278 import random
271 return random.random()
279 return random.random()
272 v = self.client.load_balanced_view()
280 v = self.client.load_balanced_view()
273 ar = v.apply_async(f)
281 ar = v.apply_async(f)
274 r1 = ar.get(1)
282 r1 = ar.get(1)
275 # give the Hub a chance to notice:
283 # give the Hub a chance to notice:
276 time.sleep(0.5)
284 time.sleep(0.5)
277 ahr = self.client.resubmit(ar.msg_ids)
285 ahr = self.client.resubmit(ar.msg_ids)
278 r2 = ahr.get(1)
286 r2 = ahr.get(1)
279 self.assertFalse(r1 == r2)
287 self.assertFalse(r1 == r2)
280
288
281 def test_resubmit_inflight(self):
289 def test_resubmit_inflight(self):
282 """ensure ValueError on resubmit of inflight task"""
290 """ensure ValueError on resubmit of inflight task"""
283 v = self.client.load_balanced_view()
291 v = self.client.load_balanced_view()
284 ar = v.apply_async(time.sleep,1)
292 ar = v.apply_async(time.sleep,1)
285 # give the message a chance to arrive
293 # give the message a chance to arrive
286 time.sleep(0.2)
294 time.sleep(0.2)
287 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
295 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
288 ar.get(2)
296 ar.get(2)
289
297
290 def test_resubmit_badkey(self):
298 def test_resubmit_badkey(self):
291 """ensure KeyError on resubmit of nonexistant task"""
299 """ensure KeyError on resubmit of nonexistant task"""
292 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
300 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
293
301
294 def test_purge_results(self):
302 def test_purge_results(self):
295 # ensure there are some tasks
303 # ensure there are some tasks
296 for i in range(5):
304 for i in range(5):
297 self.client[:].apply_sync(lambda : 1)
305 self.client[:].apply_sync(lambda : 1)
298 # Wait for the Hub to realise the result is done:
306 # Wait for the Hub to realise the result is done:
299 # This prevents a race condition, where we
307 # This prevents a race condition, where we
300 # might purge a result the Hub still thinks is pending.
308 # might purge a result the Hub still thinks is pending.
301 time.sleep(0.1)
309 time.sleep(0.1)
302 rc2 = clientmod.Client(profile='iptest')
310 rc2 = clientmod.Client(profile='iptest')
303 hist = self.client.hub_history()
311 hist = self.client.hub_history()
304 ahr = rc2.get_result([hist[-1]])
312 ahr = rc2.get_result([hist[-1]])
305 ahr.wait(10)
313 ahr.wait(10)
306 self.client.purge_results(hist[-1])
314 self.client.purge_results(hist[-1])
307 newhist = self.client.hub_history()
315 newhist = self.client.hub_history()
308 self.assertEquals(len(newhist)+1,len(hist))
316 self.assertEquals(len(newhist)+1,len(hist))
309 rc2.spin()
317 rc2.spin()
310 rc2.close()
318 rc2.close()
311
319
312 def test_purge_all_results(self):
320 def test_purge_all_results(self):
313 self.client.purge_results('all')
321 self.client.purge_results('all')
314 hist = self.client.hub_history()
322 hist = self.client.hub_history()
315 self.assertEquals(len(hist), 0)
323 self.assertEquals(len(hist), 0)
316
324
General Comments 0
You need to be logged in to leave comments. Login now