##// END OF EJS Templates
resubmitted tasks are now wholly separate (new msg_ids)...
MinRK -
Show More
@@ -1,1570 +1,1566 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35
35
36 from IPython.utils.jsonutil import rekey
36 from IPython.utils.jsonutil import rekey
37 from IPython.utils.localinterfaces import LOCAL_IPS
37 from IPython.utils.localinterfaces import LOCAL_IPS
38 from IPython.utils.path import get_ipython_dir
38 from IPython.utils.path import get_ipython_dir
39 from IPython.utils.py3compat import cast_bytes
39 from IPython.utils.py3compat import cast_bytes
40 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
40 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
41 Dict, List, Bool, Set, Any)
41 Dict, List, Bool, Set, Any)
42 from IPython.external.decorator import decorator
42 from IPython.external.decorator import decorator
43 from IPython.external.ssh import tunnel
43 from IPython.external.ssh import tunnel
44
44
45 from IPython.parallel import Reference
45 from IPython.parallel import Reference
46 from IPython.parallel import error
46 from IPython.parallel import error
47 from IPython.parallel import util
47 from IPython.parallel import util
48
48
49 from IPython.zmq.session import Session, Message
49 from IPython.zmq.session import Session, Message
50
50
51 from .asyncresult import AsyncResult, AsyncHubResult
51 from .asyncresult import AsyncResult, AsyncHubResult
52 from IPython.core.profiledir import ProfileDir, ProfileDirError
52 from IPython.core.profiledir import ProfileDir, ProfileDirError
53 from .view import DirectView, LoadBalancedView
53 from .view import DirectView, LoadBalancedView
54
54
55 if sys.version_info[0] >= 3:
55 if sys.version_info[0] >= 3:
56 # xrange is used in a couple 'isinstance' tests in py2
56 # xrange is used in a couple 'isinstance' tests in py2
57 # should be just 'range' in 3k
57 # should be just 'range' in 3k
58 xrange = range
58 xrange = range
59
59
60 #--------------------------------------------------------------------------
60 #--------------------------------------------------------------------------
61 # Decorators for Client methods
61 # Decorators for Client methods
62 #--------------------------------------------------------------------------
62 #--------------------------------------------------------------------------
63
63
64 @decorator
64 @decorator
65 def spin_first(f, self, *args, **kwargs):
65 def spin_first(f, self, *args, **kwargs):
66 """Call spin() to sync state prior to calling the method."""
66 """Call spin() to sync state prior to calling the method."""
67 self.spin()
67 self.spin()
68 return f(self, *args, **kwargs)
68 return f(self, *args, **kwargs)
69
69
70
70
71 #--------------------------------------------------------------------------
71 #--------------------------------------------------------------------------
72 # Classes
72 # Classes
73 #--------------------------------------------------------------------------
73 #--------------------------------------------------------------------------
74
74
75 class Metadata(dict):
75 class Metadata(dict):
76 """Subclass of dict for initializing metadata values.
76 """Subclass of dict for initializing metadata values.
77
77
78 Attribute access works on keys.
78 Attribute access works on keys.
79
79
80 These objects have a strict set of keys - errors will raise if you try
80 These objects have a strict set of keys - errors will raise if you try
81 to add new keys.
81 to add new keys.
82 """
82 """
83 def __init__(self, *args, **kwargs):
83 def __init__(self, *args, **kwargs):
84 dict.__init__(self)
84 dict.__init__(self)
85 md = {'msg_id' : None,
85 md = {'msg_id' : None,
86 'submitted' : None,
86 'submitted' : None,
87 'started' : None,
87 'started' : None,
88 'completed' : None,
88 'completed' : None,
89 'received' : None,
89 'received' : None,
90 'engine_uuid' : None,
90 'engine_uuid' : None,
91 'engine_id' : None,
91 'engine_id' : None,
92 'follow' : None,
92 'follow' : None,
93 'after' : None,
93 'after' : None,
94 'status' : None,
94 'status' : None,
95
95
96 'pyin' : None,
96 'pyin' : None,
97 'pyout' : None,
97 'pyout' : None,
98 'pyerr' : None,
98 'pyerr' : None,
99 'stdout' : '',
99 'stdout' : '',
100 'stderr' : '',
100 'stderr' : '',
101 'outputs' : [],
101 'outputs' : [],
102 }
102 }
103 self.update(md)
103 self.update(md)
104 self.update(dict(*args, **kwargs))
104 self.update(dict(*args, **kwargs))
105
105
106 def __getattr__(self, key):
106 def __getattr__(self, key):
107 """getattr aliased to getitem"""
107 """getattr aliased to getitem"""
108 if key in self.iterkeys():
108 if key in self.iterkeys():
109 return self[key]
109 return self[key]
110 else:
110 else:
111 raise AttributeError(key)
111 raise AttributeError(key)
112
112
113 def __setattr__(self, key, value):
113 def __setattr__(self, key, value):
114 """setattr aliased to setitem, with strict"""
114 """setattr aliased to setitem, with strict"""
115 if key in self.iterkeys():
115 if key in self.iterkeys():
116 self[key] = value
116 self[key] = value
117 else:
117 else:
118 raise AttributeError(key)
118 raise AttributeError(key)
119
119
120 def __setitem__(self, key, value):
120 def __setitem__(self, key, value):
121 """strict static key enforcement"""
121 """strict static key enforcement"""
122 if key in self.iterkeys():
122 if key in self.iterkeys():
123 dict.__setitem__(self, key, value)
123 dict.__setitem__(self, key, value)
124 else:
124 else:
125 raise KeyError(key)
125 raise KeyError(key)
126
126
127
127
128 class Client(HasTraits):
128 class Client(HasTraits):
129 """A semi-synchronous client to the IPython ZMQ cluster
129 """A semi-synchronous client to the IPython ZMQ cluster
130
130
131 Parameters
131 Parameters
132 ----------
132 ----------
133
133
134 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
134 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
135 Connection information for the Hub's registration. If a json connector
135 Connection information for the Hub's registration. If a json connector
136 file is given, then likely no further configuration is necessary.
136 file is given, then likely no further configuration is necessary.
137 [Default: use profile]
137 [Default: use profile]
138 profile : bytes
138 profile : bytes
139 The name of the Cluster profile to be used to find connector information.
139 The name of the Cluster profile to be used to find connector information.
140 If run from an IPython application, the default profile will be the same
140 If run from an IPython application, the default profile will be the same
141 as the running application, otherwise it will be 'default'.
141 as the running application, otherwise it will be 'default'.
142 context : zmq.Context
142 context : zmq.Context
143 Pass an existing zmq.Context instance, otherwise the client will create its own.
143 Pass an existing zmq.Context instance, otherwise the client will create its own.
144 debug : bool
144 debug : bool
145 flag for lots of message printing for debug purposes
145 flag for lots of message printing for debug purposes
146 timeout : int/float
146 timeout : int/float
147 time (in seconds) to wait for connection replies from the Hub
147 time (in seconds) to wait for connection replies from the Hub
148 [Default: 10]
148 [Default: 10]
149
149
150 #-------------- session related args ----------------
150 #-------------- session related args ----------------
151
151
152 config : Config object
152 config : Config object
153 If specified, this will be relayed to the Session for configuration
153 If specified, this will be relayed to the Session for configuration
154 username : str
154 username : str
155 set username for the session object
155 set username for the session object
156 packer : str (import_string) or callable
156 packer : str (import_string) or callable
157 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
157 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
158 function to serialize messages. Must support same input as
158 function to serialize messages. Must support same input as
159 JSON, and output must be bytes.
159 JSON, and output must be bytes.
160 You can pass a callable directly as `pack`
160 You can pass a callable directly as `pack`
161 unpacker : str (import_string) or callable
161 unpacker : str (import_string) or callable
162 The inverse of packer. Only necessary if packer is specified as *not* one
162 The inverse of packer. Only necessary if packer is specified as *not* one
163 of 'json' or 'pickle'.
163 of 'json' or 'pickle'.
164
164
165 #-------------- ssh related args ----------------
165 #-------------- ssh related args ----------------
166 # These are args for configuring the ssh tunnel to be used
166 # These are args for configuring the ssh tunnel to be used
167 # credentials are used to forward connections over ssh to the Controller
167 # credentials are used to forward connections over ssh to the Controller
168 # Note that the ip given in `addr` needs to be relative to sshserver
168 # Note that the ip given in `addr` needs to be relative to sshserver
169 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
169 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
170 # and set sshserver as the same machine the Controller is on. However,
170 # and set sshserver as the same machine the Controller is on. However,
171 # the only requirement is that sshserver is able to see the Controller
171 # the only requirement is that sshserver is able to see the Controller
172 # (i.e. is within the same trusted network).
172 # (i.e. is within the same trusted network).
173
173
174 sshserver : str
174 sshserver : str
175 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
175 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
176 If keyfile or password is specified, and this is not, it will default to
176 If keyfile or password is specified, and this is not, it will default to
177 the ip given in addr.
177 the ip given in addr.
178 sshkey : str; path to ssh private key file
178 sshkey : str; path to ssh private key file
179 This specifies a key to be used in ssh login, default None.
179 This specifies a key to be used in ssh login, default None.
180 Regular default ssh keys will be used without specifying this argument.
180 Regular default ssh keys will be used without specifying this argument.
181 password : str
181 password : str
182 Your ssh password to sshserver. Note that if this is left None,
182 Your ssh password to sshserver. Note that if this is left None,
183 you will be prompted for it if passwordless key based login is unavailable.
183 you will be prompted for it if passwordless key based login is unavailable.
184 paramiko : bool
184 paramiko : bool
185 flag for whether to use paramiko instead of shell ssh for tunneling.
185 flag for whether to use paramiko instead of shell ssh for tunneling.
186 [default: True on win32, False else]
186 [default: True on win32, False else]
187
187
188 ------- exec authentication args -------
188 ------- exec authentication args -------
189 If even localhost is untrusted, you can have some protection against
189 If even localhost is untrusted, you can have some protection against
190 unauthorized execution by signing messages with HMAC digests.
190 unauthorized execution by signing messages with HMAC digests.
191 Messages are still sent as cleartext, so if someone can snoop your
191 Messages are still sent as cleartext, so if someone can snoop your
192 loopback traffic this will not protect your privacy, but will prevent
192 loopback traffic this will not protect your privacy, but will prevent
193 unauthorized execution.
193 unauthorized execution.
194
194
195 exec_key : str
195 exec_key : str
196 an authentication key or file containing a key
196 an authentication key or file containing a key
197 default: None
197 default: None
198
198
199
199
200 Attributes
200 Attributes
201 ----------
201 ----------
202
202
203 ids : list of int engine IDs
203 ids : list of int engine IDs
204 requesting the ids attribute always synchronizes
204 requesting the ids attribute always synchronizes
205 the registration state. To request ids without synchronization,
205 the registration state. To request ids without synchronization,
206 use semi-private _ids attributes.
206 use semi-private _ids attributes.
207
207
208 history : list of msg_ids
208 history : list of msg_ids
209 a list of msg_ids, keeping track of all the execution
209 a list of msg_ids, keeping track of all the execution
210 messages you have submitted in order.
210 messages you have submitted in order.
211
211
212 outstanding : set of msg_ids
212 outstanding : set of msg_ids
213 a set of msg_ids that have been submitted, but whose
213 a set of msg_ids that have been submitted, but whose
214 results have not yet been received.
214 results have not yet been received.
215
215
216 results : dict
216 results : dict
217 a dict of all our results, keyed by msg_id
217 a dict of all our results, keyed by msg_id
218
218
219 block : bool
219 block : bool
220 determines default behavior when block not specified
220 determines default behavior when block not specified
221 in execution methods
221 in execution methods
222
222
223 Methods
223 Methods
224 -------
224 -------
225
225
226 spin
226 spin
227 flushes incoming results and registration state changes
227 flushes incoming results and registration state changes
228 control methods spin, and requesting `ids` also ensures up to date
228 control methods spin, and requesting `ids` also ensures up to date
229
229
230 wait
230 wait
231 wait on one or more msg_ids
231 wait on one or more msg_ids
232
232
233 execution methods
233 execution methods
234 apply
234 apply
235 legacy: execute, run
235 legacy: execute, run
236
236
237 data movement
237 data movement
238 push, pull, scatter, gather
238 push, pull, scatter, gather
239
239
240 query methods
240 query methods
241 queue_status, get_result, purge, result_status
241 queue_status, get_result, purge, result_status
242
242
243 control methods
243 control methods
244 abort, shutdown
244 abort, shutdown
245
245
246 """
246 """
247
247
248
248
249 block = Bool(False)
249 block = Bool(False)
250 outstanding = Set()
250 outstanding = Set()
251 results = Instance('collections.defaultdict', (dict,))
251 results = Instance('collections.defaultdict', (dict,))
252 metadata = Instance('collections.defaultdict', (Metadata,))
252 metadata = Instance('collections.defaultdict', (Metadata,))
253 history = List()
253 history = List()
254 debug = Bool(False)
254 debug = Bool(False)
255 _spin_thread = Any()
255 _spin_thread = Any()
256 _stop_spinning = Any()
256 _stop_spinning = Any()
257
257
258 profile=Unicode()
258 profile=Unicode()
259 def _profile_default(self):
259 def _profile_default(self):
260 if BaseIPythonApplication.initialized():
260 if BaseIPythonApplication.initialized():
261 # an IPython app *might* be running, try to get its profile
261 # an IPython app *might* be running, try to get its profile
262 try:
262 try:
263 return BaseIPythonApplication.instance().profile
263 return BaseIPythonApplication.instance().profile
264 except (AttributeError, MultipleInstanceError):
264 except (AttributeError, MultipleInstanceError):
265 # could be a *different* subclass of config.Application,
265 # could be a *different* subclass of config.Application,
266 # which would raise one of these two errors.
266 # which would raise one of these two errors.
267 return u'default'
267 return u'default'
268 else:
268 else:
269 return u'default'
269 return u'default'
270
270
271
271
272 _outstanding_dict = Instance('collections.defaultdict', (set,))
272 _outstanding_dict = Instance('collections.defaultdict', (set,))
273 _ids = List()
273 _ids = List()
274 _connected=Bool(False)
274 _connected=Bool(False)
275 _ssh=Bool(False)
275 _ssh=Bool(False)
276 _context = Instance('zmq.Context')
276 _context = Instance('zmq.Context')
277 _config = Dict()
277 _config = Dict()
278 _engines=Instance(util.ReverseDict, (), {})
278 _engines=Instance(util.ReverseDict, (), {})
279 # _hub_socket=Instance('zmq.Socket')
279 # _hub_socket=Instance('zmq.Socket')
280 _query_socket=Instance('zmq.Socket')
280 _query_socket=Instance('zmq.Socket')
281 _control_socket=Instance('zmq.Socket')
281 _control_socket=Instance('zmq.Socket')
282 _iopub_socket=Instance('zmq.Socket')
282 _iopub_socket=Instance('zmq.Socket')
283 _notification_socket=Instance('zmq.Socket')
283 _notification_socket=Instance('zmq.Socket')
284 _mux_socket=Instance('zmq.Socket')
284 _mux_socket=Instance('zmq.Socket')
285 _task_socket=Instance('zmq.Socket')
285 _task_socket=Instance('zmq.Socket')
286 _task_scheme=Unicode()
286 _task_scheme=Unicode()
287 _closed = False
287 _closed = False
288 _ignored_control_replies=Integer(0)
288 _ignored_control_replies=Integer(0)
289 _ignored_hub_replies=Integer(0)
289 _ignored_hub_replies=Integer(0)
290
290
291 def __new__(self, *args, **kw):
291 def __new__(self, *args, **kw):
292 # don't raise on positional args
292 # don't raise on positional args
293 return HasTraits.__new__(self, **kw)
293 return HasTraits.__new__(self, **kw)
294
294
295 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
295 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
296 context=None, debug=False, exec_key=None,
296 context=None, debug=False, exec_key=None,
297 sshserver=None, sshkey=None, password=None, paramiko=None,
297 sshserver=None, sshkey=None, password=None, paramiko=None,
298 timeout=10, **extra_args
298 timeout=10, **extra_args
299 ):
299 ):
300 if profile:
300 if profile:
301 super(Client, self).__init__(debug=debug, profile=profile)
301 super(Client, self).__init__(debug=debug, profile=profile)
302 else:
302 else:
303 super(Client, self).__init__(debug=debug)
303 super(Client, self).__init__(debug=debug)
304 if context is None:
304 if context is None:
305 context = zmq.Context.instance()
305 context = zmq.Context.instance()
306 self._context = context
306 self._context = context
307 self._stop_spinning = Event()
307 self._stop_spinning = Event()
308
308
309 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
309 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
310 if self._cd is not None:
310 if self._cd is not None:
311 if url_or_file is None:
311 if url_or_file is None:
312 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
312 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
313 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
313 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
314 " Please specify at least one of url_or_file or profile."
314 " Please specify at least one of url_or_file or profile."
315
315
316 if not util.is_url(url_or_file):
316 if not util.is_url(url_or_file):
317 # it's not a url, try for a file
317 # it's not a url, try for a file
318 if not os.path.exists(url_or_file):
318 if not os.path.exists(url_or_file):
319 if self._cd:
319 if self._cd:
320 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
320 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
321 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
321 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
322 with open(url_or_file) as f:
322 with open(url_or_file) as f:
323 cfg = json.loads(f.read())
323 cfg = json.loads(f.read())
324 else:
324 else:
325 cfg = {'url':url_or_file}
325 cfg = {'url':url_or_file}
326
326
327 # sync defaults from args, json:
327 # sync defaults from args, json:
328 if sshserver:
328 if sshserver:
329 cfg['ssh'] = sshserver
329 cfg['ssh'] = sshserver
330 if exec_key:
330 if exec_key:
331 cfg['exec_key'] = exec_key
331 cfg['exec_key'] = exec_key
332 exec_key = cfg['exec_key']
332 exec_key = cfg['exec_key']
333 location = cfg.setdefault('location', None)
333 location = cfg.setdefault('location', None)
334 cfg['url'] = util.disambiguate_url(cfg['url'], location)
334 cfg['url'] = util.disambiguate_url(cfg['url'], location)
335 url = cfg['url']
335 url = cfg['url']
336 proto,addr,port = util.split_url(url)
336 proto,addr,port = util.split_url(url)
337 if location is not None and addr == '127.0.0.1':
337 if location is not None and addr == '127.0.0.1':
338 # location specified, and connection is expected to be local
338 # location specified, and connection is expected to be local
339 if location not in LOCAL_IPS and not sshserver:
339 if location not in LOCAL_IPS and not sshserver:
340 # load ssh from JSON *only* if the controller is not on
340 # load ssh from JSON *only* if the controller is not on
341 # this machine
341 # this machine
342 sshserver=cfg['ssh']
342 sshserver=cfg['ssh']
343 if location not in LOCAL_IPS and not sshserver:
343 if location not in LOCAL_IPS and not sshserver:
344 # warn if no ssh specified, but SSH is probably needed
344 # warn if no ssh specified, but SSH is probably needed
345 # This is only a warning, because the most likely cause
345 # This is only a warning, because the most likely cause
346 # is a local Controller on a laptop whose IP is dynamic
346 # is a local Controller on a laptop whose IP is dynamic
347 warnings.warn("""
347 warnings.warn("""
348 Controller appears to be listening on localhost, but not on this machine.
348 Controller appears to be listening on localhost, but not on this machine.
349 If this is true, you should specify Client(...,sshserver='you@%s')
349 If this is true, you should specify Client(...,sshserver='you@%s')
350 or instruct your controller to listen on an external IP."""%location,
350 or instruct your controller to listen on an external IP."""%location,
351 RuntimeWarning)
351 RuntimeWarning)
352 elif not sshserver:
352 elif not sshserver:
353 # otherwise sync with cfg
353 # otherwise sync with cfg
354 sshserver = cfg['ssh']
354 sshserver = cfg['ssh']
355
355
356 self._config = cfg
356 self._config = cfg
357
357
358 self._ssh = bool(sshserver or sshkey or password)
358 self._ssh = bool(sshserver or sshkey or password)
359 if self._ssh and sshserver is None:
359 if self._ssh and sshserver is None:
360 # default to ssh via localhost
360 # default to ssh via localhost
361 sshserver = url.split('://')[1].split(':')[0]
361 sshserver = url.split('://')[1].split(':')[0]
362 if self._ssh and password is None:
362 if self._ssh and password is None:
363 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
363 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
364 password=False
364 password=False
365 else:
365 else:
366 password = getpass("SSH Password for %s: "%sshserver)
366 password = getpass("SSH Password for %s: "%sshserver)
367 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
367 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
368
368
369 # configure and construct the session
369 # configure and construct the session
370 if exec_key is not None:
370 if exec_key is not None:
371 if os.path.isfile(exec_key):
371 if os.path.isfile(exec_key):
372 extra_args['keyfile'] = exec_key
372 extra_args['keyfile'] = exec_key
373 else:
373 else:
374 exec_key = cast_bytes(exec_key)
374 exec_key = cast_bytes(exec_key)
375 extra_args['key'] = exec_key
375 extra_args['key'] = exec_key
376 self.session = Session(**extra_args)
376 self.session = Session(**extra_args)
377
377
378 self._query_socket = self._context.socket(zmq.DEALER)
378 self._query_socket = self._context.socket(zmq.DEALER)
379 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
379 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
380 if self._ssh:
380 if self._ssh:
381 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
381 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
382 else:
382 else:
383 self._query_socket.connect(url)
383 self._query_socket.connect(url)
384
384
385 self.session.debug = self.debug
385 self.session.debug = self.debug
386
386
387 self._notification_handlers = {'registration_notification' : self._register_engine,
387 self._notification_handlers = {'registration_notification' : self._register_engine,
388 'unregistration_notification' : self._unregister_engine,
388 'unregistration_notification' : self._unregister_engine,
389 'shutdown_notification' : lambda msg: self.close(),
389 'shutdown_notification' : lambda msg: self.close(),
390 }
390 }
391 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
391 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
392 'apply_reply' : self._handle_apply_reply}
392 'apply_reply' : self._handle_apply_reply}
393 self._connect(sshserver, ssh_kwargs, timeout)
393 self._connect(sshserver, ssh_kwargs, timeout)
394
394
395 def __del__(self):
395 def __del__(self):
396 """cleanup sockets, but _not_ context."""
396 """cleanup sockets, but _not_ context."""
397 self.close()
397 self.close()
398
398
399 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
399 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
400 if ipython_dir is None:
400 if ipython_dir is None:
401 ipython_dir = get_ipython_dir()
401 ipython_dir = get_ipython_dir()
402 if profile_dir is not None:
402 if profile_dir is not None:
403 try:
403 try:
404 self._cd = ProfileDir.find_profile_dir(profile_dir)
404 self._cd = ProfileDir.find_profile_dir(profile_dir)
405 return
405 return
406 except ProfileDirError:
406 except ProfileDirError:
407 pass
407 pass
408 elif profile is not None:
408 elif profile is not None:
409 try:
409 try:
410 self._cd = ProfileDir.find_profile_dir_by_name(
410 self._cd = ProfileDir.find_profile_dir_by_name(
411 ipython_dir, profile)
411 ipython_dir, profile)
412 return
412 return
413 except ProfileDirError:
413 except ProfileDirError:
414 pass
414 pass
415 self._cd = None
415 self._cd = None
416
416
417 def _update_engines(self, engines):
417 def _update_engines(self, engines):
418 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
418 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
419 for k,v in engines.iteritems():
419 for k,v in engines.iteritems():
420 eid = int(k)
420 eid = int(k)
421 self._engines[eid] = v
421 self._engines[eid] = v
422 self._ids.append(eid)
422 self._ids.append(eid)
423 self._ids = sorted(self._ids)
423 self._ids = sorted(self._ids)
424 if sorted(self._engines.keys()) != range(len(self._engines)) and \
424 if sorted(self._engines.keys()) != range(len(self._engines)) and \
425 self._task_scheme == 'pure' and self._task_socket:
425 self._task_scheme == 'pure' and self._task_socket:
426 self._stop_scheduling_tasks()
426 self._stop_scheduling_tasks()
427
427
428 def _stop_scheduling_tasks(self):
428 def _stop_scheduling_tasks(self):
429 """Stop scheduling tasks because an engine has been unregistered
429 """Stop scheduling tasks because an engine has been unregistered
430 from a pure ZMQ scheduler.
430 from a pure ZMQ scheduler.
431 """
431 """
432 self._task_socket.close()
432 self._task_socket.close()
433 self._task_socket = None
433 self._task_socket = None
434 msg = "An engine has been unregistered, and we are using pure " +\
434 msg = "An engine has been unregistered, and we are using pure " +\
435 "ZMQ task scheduling. Task farming will be disabled."
435 "ZMQ task scheduling. Task farming will be disabled."
436 if self.outstanding:
436 if self.outstanding:
437 msg += " If you were running tasks when this happened, " +\
437 msg += " If you were running tasks when this happened, " +\
438 "some `outstanding` msg_ids may never resolve."
438 "some `outstanding` msg_ids may never resolve."
439 warnings.warn(msg, RuntimeWarning)
439 warnings.warn(msg, RuntimeWarning)
440
440
441 def _build_targets(self, targets):
441 def _build_targets(self, targets):
442 """Turn valid target IDs or 'all' into two lists:
442 """Turn valid target IDs or 'all' into two lists:
443 (int_ids, uuids).
443 (int_ids, uuids).
444 """
444 """
445 if not self._ids:
445 if not self._ids:
446 # flush notification socket if no engines yet, just in case
446 # flush notification socket if no engines yet, just in case
447 if not self.ids:
447 if not self.ids:
448 raise error.NoEnginesRegistered("Can't build targets without any engines")
448 raise error.NoEnginesRegistered("Can't build targets without any engines")
449
449
450 if targets is None:
450 if targets is None:
451 targets = self._ids
451 targets = self._ids
452 elif isinstance(targets, basestring):
452 elif isinstance(targets, basestring):
453 if targets.lower() == 'all':
453 if targets.lower() == 'all':
454 targets = self._ids
454 targets = self._ids
455 else:
455 else:
456 raise TypeError("%r not valid str target, must be 'all'"%(targets))
456 raise TypeError("%r not valid str target, must be 'all'"%(targets))
457 elif isinstance(targets, int):
457 elif isinstance(targets, int):
458 if targets < 0:
458 if targets < 0:
459 targets = self.ids[targets]
459 targets = self.ids[targets]
460 if targets not in self._ids:
460 if targets not in self._ids:
461 raise IndexError("No such engine: %i"%targets)
461 raise IndexError("No such engine: %i"%targets)
462 targets = [targets]
462 targets = [targets]
463
463
464 if isinstance(targets, slice):
464 if isinstance(targets, slice):
465 indices = range(len(self._ids))[targets]
465 indices = range(len(self._ids))[targets]
466 ids = self.ids
466 ids = self.ids
467 targets = [ ids[i] for i in indices ]
467 targets = [ ids[i] for i in indices ]
468
468
469 if not isinstance(targets, (tuple, list, xrange)):
469 if not isinstance(targets, (tuple, list, xrange)):
470 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
470 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
471
471
472 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
472 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
473
473
474 def _connect(self, sshserver, ssh_kwargs, timeout):
474 def _connect(self, sshserver, ssh_kwargs, timeout):
475 """setup all our socket connections to the cluster. This is called from
475 """setup all our socket connections to the cluster. This is called from
476 __init__."""
476 __init__."""
477
477
478 # Maybe allow reconnecting?
478 # Maybe allow reconnecting?
479 if self._connected:
479 if self._connected:
480 return
480 return
481 self._connected=True
481 self._connected=True
482
482
483 def connect_socket(s, url):
483 def connect_socket(s, url):
484 url = util.disambiguate_url(url, self._config['location'])
484 url = util.disambiguate_url(url, self._config['location'])
485 if self._ssh:
485 if self._ssh:
486 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
486 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
487 else:
487 else:
488 return s.connect(url)
488 return s.connect(url)
489
489
490 self.session.send(self._query_socket, 'connection_request')
490 self.session.send(self._query_socket, 'connection_request')
491 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
491 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
492 poller = zmq.Poller()
492 poller = zmq.Poller()
493 poller.register(self._query_socket, zmq.POLLIN)
493 poller.register(self._query_socket, zmq.POLLIN)
494 # poll expects milliseconds, timeout is seconds
494 # poll expects milliseconds, timeout is seconds
495 evts = poller.poll(timeout*1000)
495 evts = poller.poll(timeout*1000)
496 if not evts:
496 if not evts:
497 raise error.TimeoutError("Hub connection request timed out")
497 raise error.TimeoutError("Hub connection request timed out")
498 idents,msg = self.session.recv(self._query_socket,mode=0)
498 idents,msg = self.session.recv(self._query_socket,mode=0)
499 if self.debug:
499 if self.debug:
500 pprint(msg)
500 pprint(msg)
501 msg = Message(msg)
501 msg = Message(msg)
502 content = msg.content
502 content = msg.content
503 self._config['registration'] = dict(content)
503 self._config['registration'] = dict(content)
504 if content.status == 'ok':
504 if content.status == 'ok':
505 ident = self.session.bsession
505 ident = self.session.bsession
506 if content.mux:
506 if content.mux:
507 self._mux_socket = self._context.socket(zmq.DEALER)
507 self._mux_socket = self._context.socket(zmq.DEALER)
508 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
508 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
509 connect_socket(self._mux_socket, content.mux)
509 connect_socket(self._mux_socket, content.mux)
510 if content.task:
510 if content.task:
511 self._task_scheme, task_addr = content.task
511 self._task_scheme, task_addr = content.task
512 self._task_socket = self._context.socket(zmq.DEALER)
512 self._task_socket = self._context.socket(zmq.DEALER)
513 self._task_socket.setsockopt(zmq.IDENTITY, ident)
513 self._task_socket.setsockopt(zmq.IDENTITY, ident)
514 connect_socket(self._task_socket, task_addr)
514 connect_socket(self._task_socket, task_addr)
515 if content.notification:
515 if content.notification:
516 self._notification_socket = self._context.socket(zmq.SUB)
516 self._notification_socket = self._context.socket(zmq.SUB)
517 connect_socket(self._notification_socket, content.notification)
517 connect_socket(self._notification_socket, content.notification)
518 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
518 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
519 # if content.query:
519 # if content.query:
520 # self._query_socket = self._context.socket(zmq.DEALER)
520 # self._query_socket = self._context.socket(zmq.DEALER)
521 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
521 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
522 # connect_socket(self._query_socket, content.query)
522 # connect_socket(self._query_socket, content.query)
523 if content.control:
523 if content.control:
524 self._control_socket = self._context.socket(zmq.DEALER)
524 self._control_socket = self._context.socket(zmq.DEALER)
525 self._control_socket.setsockopt(zmq.IDENTITY, ident)
525 self._control_socket.setsockopt(zmq.IDENTITY, ident)
526 connect_socket(self._control_socket, content.control)
526 connect_socket(self._control_socket, content.control)
527 if content.iopub:
527 if content.iopub:
528 self._iopub_socket = self._context.socket(zmq.SUB)
528 self._iopub_socket = self._context.socket(zmq.SUB)
529 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
529 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
530 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
530 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
531 connect_socket(self._iopub_socket, content.iopub)
531 connect_socket(self._iopub_socket, content.iopub)
532 self._update_engines(dict(content.engines))
532 self._update_engines(dict(content.engines))
533 else:
533 else:
534 self._connected = False
534 self._connected = False
535 raise Exception("Failed to connect!")
535 raise Exception("Failed to connect!")
536
536
537 #--------------------------------------------------------------------------
537 #--------------------------------------------------------------------------
538 # handlers and callbacks for incoming messages
538 # handlers and callbacks for incoming messages
539 #--------------------------------------------------------------------------
539 #--------------------------------------------------------------------------
540
540
541 def _unwrap_exception(self, content):
541 def _unwrap_exception(self, content):
542 """unwrap exception, and remap engine_id to int."""
542 """unwrap exception, and remap engine_id to int."""
543 e = error.unwrap_exception(content)
543 e = error.unwrap_exception(content)
544 # print e.traceback
544 # print e.traceback
545 if e.engine_info:
545 if e.engine_info:
546 e_uuid = e.engine_info['engine_uuid']
546 e_uuid = e.engine_info['engine_uuid']
547 eid = self._engines[e_uuid]
547 eid = self._engines[e_uuid]
548 e.engine_info['engine_id'] = eid
548 e.engine_info['engine_id'] = eid
549 return e
549 return e
550
550
551 def _extract_metadata(self, header, parent, content):
551 def _extract_metadata(self, header, parent, content):
552 md = {'msg_id' : parent['msg_id'],
552 md = {'msg_id' : parent['msg_id'],
553 'received' : datetime.now(),
553 'received' : datetime.now(),
554 'engine_uuid' : header.get('engine', None),
554 'engine_uuid' : header.get('engine', None),
555 'follow' : parent.get('follow', []),
555 'follow' : parent.get('follow', []),
556 'after' : parent.get('after', []),
556 'after' : parent.get('after', []),
557 'status' : content['status'],
557 'status' : content['status'],
558 }
558 }
559
559
560 if md['engine_uuid'] is not None:
560 if md['engine_uuid'] is not None:
561 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
561 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
562
562
563 if 'date' in parent:
563 if 'date' in parent:
564 md['submitted'] = parent['date']
564 md['submitted'] = parent['date']
565 if 'started' in header:
565 if 'started' in header:
566 md['started'] = header['started']
566 md['started'] = header['started']
567 if 'date' in header:
567 if 'date' in header:
568 md['completed'] = header['date']
568 md['completed'] = header['date']
569 return md
569 return md
570
570
571 def _register_engine(self, msg):
571 def _register_engine(self, msg):
572 """Register a new engine, and update our connection info."""
572 """Register a new engine, and update our connection info."""
573 content = msg['content']
573 content = msg['content']
574 eid = content['id']
574 eid = content['id']
575 d = {eid : content['queue']}
575 d = {eid : content['queue']}
576 self._update_engines(d)
576 self._update_engines(d)
577
577
578 def _unregister_engine(self, msg):
578 def _unregister_engine(self, msg):
579 """Unregister an engine that has died."""
579 """Unregister an engine that has died."""
580 content = msg['content']
580 content = msg['content']
581 eid = int(content['id'])
581 eid = int(content['id'])
582 if eid in self._ids:
582 if eid in self._ids:
583 self._ids.remove(eid)
583 self._ids.remove(eid)
584 uuid = self._engines.pop(eid)
584 uuid = self._engines.pop(eid)
585
585
586 self._handle_stranded_msgs(eid, uuid)
586 self._handle_stranded_msgs(eid, uuid)
587
587
588 if self._task_socket and self._task_scheme == 'pure':
588 if self._task_socket and self._task_scheme == 'pure':
589 self._stop_scheduling_tasks()
589 self._stop_scheduling_tasks()
590
590
591 def _handle_stranded_msgs(self, eid, uuid):
591 def _handle_stranded_msgs(self, eid, uuid):
592 """Handle messages known to be on an engine when the engine unregisters.
592 """Handle messages known to be on an engine when the engine unregisters.
593
593
594 It is possible that this will fire prematurely - that is, an engine will
594 It is possible that this will fire prematurely - that is, an engine will
595 go down after completing a result, and the client will be notified
595 go down after completing a result, and the client will be notified
596 of the unregistration and later receive the successful result.
596 of the unregistration and later receive the successful result.
597 """
597 """
598
598
599 outstanding = self._outstanding_dict[uuid]
599 outstanding = self._outstanding_dict[uuid]
600
600
601 for msg_id in list(outstanding):
601 for msg_id in list(outstanding):
602 if msg_id in self.results:
602 if msg_id in self.results:
603 # we already
603 # we already
604 continue
604 continue
605 try:
605 try:
606 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
606 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
607 except:
607 except:
608 content = error.wrap_exception()
608 content = error.wrap_exception()
609 # build a fake message:
609 # build a fake message:
610 parent = {}
610 parent = {}
611 header = {}
611 header = {}
612 parent['msg_id'] = msg_id
612 parent['msg_id'] = msg_id
613 header['engine'] = uuid
613 header['engine'] = uuid
614 header['date'] = datetime.now()
614 header['date'] = datetime.now()
615 msg = dict(parent_header=parent, header=header, content=content)
615 msg = dict(parent_header=parent, header=header, content=content)
616 self._handle_apply_reply(msg)
616 self._handle_apply_reply(msg)
617
617
618 def _handle_execute_reply(self, msg):
618 def _handle_execute_reply(self, msg):
619 """Save the reply to an execute_request into our results.
619 """Save the reply to an execute_request into our results.
620
620
621 execute messages are never actually used. apply is used instead.
621 execute messages are never actually used. apply is used instead.
622 """
622 """
623
623
624 parent = msg['parent_header']
624 parent = msg['parent_header']
625 msg_id = parent['msg_id']
625 msg_id = parent['msg_id']
626 if msg_id not in self.outstanding:
626 if msg_id not in self.outstanding:
627 if msg_id in self.history:
627 if msg_id in self.history:
628 print ("got stale result: %s"%msg_id)
628 print ("got stale result: %s"%msg_id)
629 else:
629 else:
630 print ("got unknown result: %s"%msg_id)
630 print ("got unknown result: %s"%msg_id)
631 else:
631 else:
632 self.outstanding.remove(msg_id)
632 self.outstanding.remove(msg_id)
633
633
634 content = msg['content']
634 content = msg['content']
635 header = msg['header']
635 header = msg['header']
636
636
637 # construct metadata:
637 # construct metadata:
638 md = self.metadata[msg_id]
638 md = self.metadata[msg_id]
639 md.update(self._extract_metadata(header, parent, content))
639 md.update(self._extract_metadata(header, parent, content))
640 # is this redundant?
640 # is this redundant?
641 self.metadata[msg_id] = md
641 self.metadata[msg_id] = md
642
642
643 e_outstanding = self._outstanding_dict[md['engine_uuid']]
643 e_outstanding = self._outstanding_dict[md['engine_uuid']]
644 if msg_id in e_outstanding:
644 if msg_id in e_outstanding:
645 e_outstanding.remove(msg_id)
645 e_outstanding.remove(msg_id)
646
646
647 # construct result:
647 # construct result:
648 if content['status'] == 'ok':
648 if content['status'] == 'ok':
649 self.results[msg_id] = content
649 self.results[msg_id] = content
650 elif content['status'] == 'aborted':
650 elif content['status'] == 'aborted':
651 self.results[msg_id] = error.TaskAborted(msg_id)
651 self.results[msg_id] = error.TaskAborted(msg_id)
652 elif content['status'] == 'resubmitted':
652 elif content['status'] == 'resubmitted':
653 # TODO: handle resubmission
653 # TODO: handle resubmission
654 pass
654 pass
655 else:
655 else:
656 self.results[msg_id] = self._unwrap_exception(content)
656 self.results[msg_id] = self._unwrap_exception(content)
657
657
658 def _handle_apply_reply(self, msg):
658 def _handle_apply_reply(self, msg):
659 """Save the reply to an apply_request into our results."""
659 """Save the reply to an apply_request into our results."""
660 parent = msg['parent_header']
660 parent = msg['parent_header']
661 msg_id = parent['msg_id']
661 msg_id = parent['msg_id']
662 if msg_id not in self.outstanding:
662 if msg_id not in self.outstanding:
663 if msg_id in self.history:
663 if msg_id in self.history:
664 print ("got stale result: %s"%msg_id)
664 print ("got stale result: %s"%msg_id)
665 print self.results[msg_id]
665 print self.results[msg_id]
666 print msg
666 print msg
667 else:
667 else:
668 print ("got unknown result: %s"%msg_id)
668 print ("got unknown result: %s"%msg_id)
669 else:
669 else:
670 self.outstanding.remove(msg_id)
670 self.outstanding.remove(msg_id)
671 content = msg['content']
671 content = msg['content']
672 header = msg['header']
672 header = msg['header']
673
673
674 # construct metadata:
674 # construct metadata:
675 md = self.metadata[msg_id]
675 md = self.metadata[msg_id]
676 md.update(self._extract_metadata(header, parent, content))
676 md.update(self._extract_metadata(header, parent, content))
677 # is this redundant?
677 # is this redundant?
678 self.metadata[msg_id] = md
678 self.metadata[msg_id] = md
679
679
680 e_outstanding = self._outstanding_dict[md['engine_uuid']]
680 e_outstanding = self._outstanding_dict[md['engine_uuid']]
681 if msg_id in e_outstanding:
681 if msg_id in e_outstanding:
682 e_outstanding.remove(msg_id)
682 e_outstanding.remove(msg_id)
683
683
684 # construct result:
684 # construct result:
685 if content['status'] == 'ok':
685 if content['status'] == 'ok':
686 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
686 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
687 elif content['status'] == 'aborted':
687 elif content['status'] == 'aborted':
688 self.results[msg_id] = error.TaskAborted(msg_id)
688 self.results[msg_id] = error.TaskAborted(msg_id)
689 elif content['status'] == 'resubmitted':
689 elif content['status'] == 'resubmitted':
690 # TODO: handle resubmission
690 # TODO: handle resubmission
691 pass
691 pass
692 else:
692 else:
693 self.results[msg_id] = self._unwrap_exception(content)
693 self.results[msg_id] = self._unwrap_exception(content)
694
694
695 def _flush_notifications(self):
695 def _flush_notifications(self):
696 """Flush notifications of engine registrations waiting
696 """Flush notifications of engine registrations waiting
697 in ZMQ queue."""
697 in ZMQ queue."""
698 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
698 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
699 while msg is not None:
699 while msg is not None:
700 if self.debug:
700 if self.debug:
701 pprint(msg)
701 pprint(msg)
702 msg_type = msg['header']['msg_type']
702 msg_type = msg['header']['msg_type']
703 handler = self._notification_handlers.get(msg_type, None)
703 handler = self._notification_handlers.get(msg_type, None)
704 if handler is None:
704 if handler is None:
705 raise Exception("Unhandled message type: %s"%msg.msg_type)
705 raise Exception("Unhandled message type: %s"%msg.msg_type)
706 else:
706 else:
707 handler(msg)
707 handler(msg)
708 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
708 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
709
709
710 def _flush_results(self, sock):
710 def _flush_results(self, sock):
711 """Flush task or queue results waiting in ZMQ queue."""
711 """Flush task or queue results waiting in ZMQ queue."""
712 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
712 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
713 while msg is not None:
713 while msg is not None:
714 if self.debug:
714 if self.debug:
715 pprint(msg)
715 pprint(msg)
716 msg_type = msg['header']['msg_type']
716 msg_type = msg['header']['msg_type']
717 handler = self._queue_handlers.get(msg_type, None)
717 handler = self._queue_handlers.get(msg_type, None)
718 if handler is None:
718 if handler is None:
719 raise Exception("Unhandled message type: %s"%msg.msg_type)
719 raise Exception("Unhandled message type: %s"%msg.msg_type)
720 else:
720 else:
721 handler(msg)
721 handler(msg)
722 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
722 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
723
723
724 def _flush_control(self, sock):
724 def _flush_control(self, sock):
725 """Flush replies from the control channel waiting
725 """Flush replies from the control channel waiting
726 in the ZMQ queue.
726 in the ZMQ queue.
727
727
728 Currently: ignore them."""
728 Currently: ignore them."""
729 if self._ignored_control_replies <= 0:
729 if self._ignored_control_replies <= 0:
730 return
730 return
731 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
731 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
732 while msg is not None:
732 while msg is not None:
733 self._ignored_control_replies -= 1
733 self._ignored_control_replies -= 1
734 if self.debug:
734 if self.debug:
735 pprint(msg)
735 pprint(msg)
736 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
736 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
737
737
738 def _flush_ignored_control(self):
738 def _flush_ignored_control(self):
739 """flush ignored control replies"""
739 """flush ignored control replies"""
740 while self._ignored_control_replies > 0:
740 while self._ignored_control_replies > 0:
741 self.session.recv(self._control_socket)
741 self.session.recv(self._control_socket)
742 self._ignored_control_replies -= 1
742 self._ignored_control_replies -= 1
743
743
744 def _flush_ignored_hub_replies(self):
744 def _flush_ignored_hub_replies(self):
745 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
745 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
746 while msg is not None:
746 while msg is not None:
747 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
747 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
748
748
749 def _flush_iopub(self, sock):
749 def _flush_iopub(self, sock):
750 """Flush replies from the iopub channel waiting
750 """Flush replies from the iopub channel waiting
751 in the ZMQ queue.
751 in the ZMQ queue.
752 """
752 """
753 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
753 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
754 while msg is not None:
754 while msg is not None:
755 if self.debug:
755 if self.debug:
756 pprint(msg)
756 pprint(msg)
757 parent = msg['parent_header']
757 parent = msg['parent_header']
758 # ignore IOPub messages with no parent.
758 # ignore IOPub messages with no parent.
759 # Caused by print statements or warnings from before the first execution.
759 # Caused by print statements or warnings from before the first execution.
760 if not parent:
760 if not parent:
761 continue
761 continue
762 msg_id = parent['msg_id']
762 msg_id = parent['msg_id']
763 content = msg['content']
763 content = msg['content']
764 header = msg['header']
764 header = msg['header']
765 msg_type = msg['header']['msg_type']
765 msg_type = msg['header']['msg_type']
766
766
767 # init metadata:
767 # init metadata:
768 md = self.metadata[msg_id]
768 md = self.metadata[msg_id]
769
769
770 if msg_type == 'stream':
770 if msg_type == 'stream':
771 name = content['name']
771 name = content['name']
772 s = md[name] or ''
772 s = md[name] or ''
773 md[name] = s + content['data']
773 md[name] = s + content['data']
774 elif msg_type == 'pyerr':
774 elif msg_type == 'pyerr':
775 md.update({'pyerr' : self._unwrap_exception(content)})
775 md.update({'pyerr' : self._unwrap_exception(content)})
776 elif msg_type == 'pyin':
776 elif msg_type == 'pyin':
777 md.update({'pyin' : content['code']})
777 md.update({'pyin' : content['code']})
778 elif msg_type == 'display_data':
778 elif msg_type == 'display_data':
779 md['outputs'].append(content.get('data'))
779 md['outputs'].append(content.get('data'))
780 elif msg_type == 'pyout':
780 elif msg_type == 'pyout':
781 md['pyout'] = content.get('data')
781 md['pyout'] = content.get('data')
782 else:
782 else:
783 # unhandled msg_type (status, etc.)
783 # unhandled msg_type (status, etc.)
784 pass
784 pass
785
785
786 # reduntant?
786 # reduntant?
787 self.metadata[msg_id] = md
787 self.metadata[msg_id] = md
788
788
789 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
789 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
790
790
791 #--------------------------------------------------------------------------
791 #--------------------------------------------------------------------------
792 # len, getitem
792 # len, getitem
793 #--------------------------------------------------------------------------
793 #--------------------------------------------------------------------------
794
794
795 def __len__(self):
795 def __len__(self):
796 """len(client) returns # of engines."""
796 """len(client) returns # of engines."""
797 return len(self.ids)
797 return len(self.ids)
798
798
799 def __getitem__(self, key):
799 def __getitem__(self, key):
800 """index access returns DirectView multiplexer objects
800 """index access returns DirectView multiplexer objects
801
801
802 Must be int, slice, or list/tuple/xrange of ints"""
802 Must be int, slice, or list/tuple/xrange of ints"""
803 if not isinstance(key, (int, slice, tuple, list, xrange)):
803 if not isinstance(key, (int, slice, tuple, list, xrange)):
804 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
804 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
805 else:
805 else:
806 return self.direct_view(key)
806 return self.direct_view(key)
807
807
808 #--------------------------------------------------------------------------
808 #--------------------------------------------------------------------------
809 # Begin public methods
809 # Begin public methods
810 #--------------------------------------------------------------------------
810 #--------------------------------------------------------------------------
811
811
812 @property
812 @property
813 def ids(self):
813 def ids(self):
814 """Always up-to-date ids property."""
814 """Always up-to-date ids property."""
815 self._flush_notifications()
815 self._flush_notifications()
816 # always copy:
816 # always copy:
817 return list(self._ids)
817 return list(self._ids)
818
818
819 def close(self):
819 def close(self):
820 if self._closed:
820 if self._closed:
821 return
821 return
822 self.stop_spin_thread()
822 self.stop_spin_thread()
823 snames = filter(lambda n: n.endswith('socket'), dir(self))
823 snames = filter(lambda n: n.endswith('socket'), dir(self))
824 for socket in map(lambda name: getattr(self, name), snames):
824 for socket in map(lambda name: getattr(self, name), snames):
825 if isinstance(socket, zmq.Socket) and not socket.closed:
825 if isinstance(socket, zmq.Socket) and not socket.closed:
826 socket.close()
826 socket.close()
827 self._closed = True
827 self._closed = True
828
828
829 def _spin_every(self, interval=1):
829 def _spin_every(self, interval=1):
830 """target func for use in spin_thread"""
830 """target func for use in spin_thread"""
831 while True:
831 while True:
832 if self._stop_spinning.is_set():
832 if self._stop_spinning.is_set():
833 return
833 return
834 time.sleep(interval)
834 time.sleep(interval)
835 self.spin()
835 self.spin()
836
836
837 def spin_thread(self, interval=1):
837 def spin_thread(self, interval=1):
838 """call Client.spin() in a background thread on some regular interval
838 """call Client.spin() in a background thread on some regular interval
839
839
840 This helps ensure that messages don't pile up too much in the zmq queue
840 This helps ensure that messages don't pile up too much in the zmq queue
841 while you are working on other things, or just leaving an idle terminal.
841 while you are working on other things, or just leaving an idle terminal.
842
842
843 It also helps limit potential padding of the `received` timestamp
843 It also helps limit potential padding of the `received` timestamp
844 on AsyncResult objects, used for timings.
844 on AsyncResult objects, used for timings.
845
845
846 Parameters
846 Parameters
847 ----------
847 ----------
848
848
849 interval : float, optional
849 interval : float, optional
850 The interval on which to spin the client in the background thread
850 The interval on which to spin the client in the background thread
851 (simply passed to time.sleep).
851 (simply passed to time.sleep).
852
852
853 Notes
853 Notes
854 -----
854 -----
855
855
856 For precision timing, you may want to use this method to put a bound
856 For precision timing, you may want to use this method to put a bound
857 on the jitter (in seconds) in `received` timestamps used
857 on the jitter (in seconds) in `received` timestamps used
858 in AsyncResult.wall_time.
858 in AsyncResult.wall_time.
859
859
860 """
860 """
861 if self._spin_thread is not None:
861 if self._spin_thread is not None:
862 self.stop_spin_thread()
862 self.stop_spin_thread()
863 self._stop_spinning.clear()
863 self._stop_spinning.clear()
864 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
864 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
865 self._spin_thread.daemon = True
865 self._spin_thread.daemon = True
866 self._spin_thread.start()
866 self._spin_thread.start()
867
867
868 def stop_spin_thread(self):
868 def stop_spin_thread(self):
869 """stop background spin_thread, if any"""
869 """stop background spin_thread, if any"""
870 if self._spin_thread is not None:
870 if self._spin_thread is not None:
871 self._stop_spinning.set()
871 self._stop_spinning.set()
872 self._spin_thread.join()
872 self._spin_thread.join()
873 self._spin_thread = None
873 self._spin_thread = None
874
874
875 def spin(self):
875 def spin(self):
876 """Flush any registration notifications and execution results
876 """Flush any registration notifications and execution results
877 waiting in the ZMQ queue.
877 waiting in the ZMQ queue.
878 """
878 """
879 if self._notification_socket:
879 if self._notification_socket:
880 self._flush_notifications()
880 self._flush_notifications()
881 if self._mux_socket:
881 if self._mux_socket:
882 self._flush_results(self._mux_socket)
882 self._flush_results(self._mux_socket)
883 if self._task_socket:
883 if self._task_socket:
884 self._flush_results(self._task_socket)
884 self._flush_results(self._task_socket)
885 if self._control_socket:
885 if self._control_socket:
886 self._flush_control(self._control_socket)
886 self._flush_control(self._control_socket)
887 if self._iopub_socket:
887 if self._iopub_socket:
888 self._flush_iopub(self._iopub_socket)
888 self._flush_iopub(self._iopub_socket)
889 if self._query_socket:
889 if self._query_socket:
890 self._flush_ignored_hub_replies()
890 self._flush_ignored_hub_replies()
891
891
892 def wait(self, jobs=None, timeout=-1):
892 def wait(self, jobs=None, timeout=-1):
893 """waits on one or more `jobs`, for up to `timeout` seconds.
893 """waits on one or more `jobs`, for up to `timeout` seconds.
894
894
895 Parameters
895 Parameters
896 ----------
896 ----------
897
897
898 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
898 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
899 ints are indices to self.history
899 ints are indices to self.history
900 strs are msg_ids
900 strs are msg_ids
901 default: wait on all outstanding messages
901 default: wait on all outstanding messages
902 timeout : float
902 timeout : float
903 a time in seconds, after which to give up.
903 a time in seconds, after which to give up.
904 default is -1, which means no timeout
904 default is -1, which means no timeout
905
905
906 Returns
906 Returns
907 -------
907 -------
908
908
909 True : when all msg_ids are done
909 True : when all msg_ids are done
910 False : timeout reached, some msg_ids still outstanding
910 False : timeout reached, some msg_ids still outstanding
911 """
911 """
912 tic = time.time()
912 tic = time.time()
913 if jobs is None:
913 if jobs is None:
914 theids = self.outstanding
914 theids = self.outstanding
915 else:
915 else:
916 if isinstance(jobs, (int, basestring, AsyncResult)):
916 if isinstance(jobs, (int, basestring, AsyncResult)):
917 jobs = [jobs]
917 jobs = [jobs]
918 theids = set()
918 theids = set()
919 for job in jobs:
919 for job in jobs:
920 if isinstance(job, int):
920 if isinstance(job, int):
921 # index access
921 # index access
922 job = self.history[job]
922 job = self.history[job]
923 elif isinstance(job, AsyncResult):
923 elif isinstance(job, AsyncResult):
924 map(theids.add, job.msg_ids)
924 map(theids.add, job.msg_ids)
925 continue
925 continue
926 theids.add(job)
926 theids.add(job)
927 if not theids.intersection(self.outstanding):
927 if not theids.intersection(self.outstanding):
928 return True
928 return True
929 self.spin()
929 self.spin()
930 while theids.intersection(self.outstanding):
930 while theids.intersection(self.outstanding):
931 if timeout >= 0 and ( time.time()-tic ) > timeout:
931 if timeout >= 0 and ( time.time()-tic ) > timeout:
932 break
932 break
933 time.sleep(1e-3)
933 time.sleep(1e-3)
934 self.spin()
934 self.spin()
935 return len(theids.intersection(self.outstanding)) == 0
935 return len(theids.intersection(self.outstanding)) == 0
936
936
937 #--------------------------------------------------------------------------
937 #--------------------------------------------------------------------------
938 # Control methods
938 # Control methods
939 #--------------------------------------------------------------------------
939 #--------------------------------------------------------------------------
940
940
941 @spin_first
941 @spin_first
942 def clear(self, targets=None, block=None):
942 def clear(self, targets=None, block=None):
943 """Clear the namespace in target(s)."""
943 """Clear the namespace in target(s)."""
944 block = self.block if block is None else block
944 block = self.block if block is None else block
945 targets = self._build_targets(targets)[0]
945 targets = self._build_targets(targets)[0]
946 for t in targets:
946 for t in targets:
947 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
947 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
948 error = False
948 error = False
949 if block:
949 if block:
950 self._flush_ignored_control()
950 self._flush_ignored_control()
951 for i in range(len(targets)):
951 for i in range(len(targets)):
952 idents,msg = self.session.recv(self._control_socket,0)
952 idents,msg = self.session.recv(self._control_socket,0)
953 if self.debug:
953 if self.debug:
954 pprint(msg)
954 pprint(msg)
955 if msg['content']['status'] != 'ok':
955 if msg['content']['status'] != 'ok':
956 error = self._unwrap_exception(msg['content'])
956 error = self._unwrap_exception(msg['content'])
957 else:
957 else:
958 self._ignored_control_replies += len(targets)
958 self._ignored_control_replies += len(targets)
959 if error:
959 if error:
960 raise error
960 raise error
961
961
962
962
963 @spin_first
963 @spin_first
964 def abort(self, jobs=None, targets=None, block=None):
964 def abort(self, jobs=None, targets=None, block=None):
965 """Abort specific jobs from the execution queues of target(s).
965 """Abort specific jobs from the execution queues of target(s).
966
966
967 This is a mechanism to prevent jobs that have already been submitted
967 This is a mechanism to prevent jobs that have already been submitted
968 from executing.
968 from executing.
969
969
970 Parameters
970 Parameters
971 ----------
971 ----------
972
972
973 jobs : msg_id, list of msg_ids, or AsyncResult
973 jobs : msg_id, list of msg_ids, or AsyncResult
974 The jobs to be aborted
974 The jobs to be aborted
975
975
976 If unspecified/None: abort all outstanding jobs.
976 If unspecified/None: abort all outstanding jobs.
977
977
978 """
978 """
979 block = self.block if block is None else block
979 block = self.block if block is None else block
980 jobs = jobs if jobs is not None else list(self.outstanding)
980 jobs = jobs if jobs is not None else list(self.outstanding)
981 targets = self._build_targets(targets)[0]
981 targets = self._build_targets(targets)[0]
982
982
983 msg_ids = []
983 msg_ids = []
984 if isinstance(jobs, (basestring,AsyncResult)):
984 if isinstance(jobs, (basestring,AsyncResult)):
985 jobs = [jobs]
985 jobs = [jobs]
986 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
986 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
987 if bad_ids:
987 if bad_ids:
988 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
988 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
989 for j in jobs:
989 for j in jobs:
990 if isinstance(j, AsyncResult):
990 if isinstance(j, AsyncResult):
991 msg_ids.extend(j.msg_ids)
991 msg_ids.extend(j.msg_ids)
992 else:
992 else:
993 msg_ids.append(j)
993 msg_ids.append(j)
994 content = dict(msg_ids=msg_ids)
994 content = dict(msg_ids=msg_ids)
995 for t in targets:
995 for t in targets:
996 self.session.send(self._control_socket, 'abort_request',
996 self.session.send(self._control_socket, 'abort_request',
997 content=content, ident=t)
997 content=content, ident=t)
998 error = False
998 error = False
999 if block:
999 if block:
1000 self._flush_ignored_control()
1000 self._flush_ignored_control()
1001 for i in range(len(targets)):
1001 for i in range(len(targets)):
1002 idents,msg = self.session.recv(self._control_socket,0)
1002 idents,msg = self.session.recv(self._control_socket,0)
1003 if self.debug:
1003 if self.debug:
1004 pprint(msg)
1004 pprint(msg)
1005 if msg['content']['status'] != 'ok':
1005 if msg['content']['status'] != 'ok':
1006 error = self._unwrap_exception(msg['content'])
1006 error = self._unwrap_exception(msg['content'])
1007 else:
1007 else:
1008 self._ignored_control_replies += len(targets)
1008 self._ignored_control_replies += len(targets)
1009 if error:
1009 if error:
1010 raise error
1010 raise error
1011
1011
1012 @spin_first
1012 @spin_first
1013 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1013 def shutdown(self, targets=None, restart=False, hub=False, block=None):
1014 """Terminates one or more engine processes, optionally including the hub."""
1014 """Terminates one or more engine processes, optionally including the hub."""
1015 block = self.block if block is None else block
1015 block = self.block if block is None else block
1016 if hub:
1016 if hub:
1017 targets = 'all'
1017 targets = 'all'
1018 targets = self._build_targets(targets)[0]
1018 targets = self._build_targets(targets)[0]
1019 for t in targets:
1019 for t in targets:
1020 self.session.send(self._control_socket, 'shutdown_request',
1020 self.session.send(self._control_socket, 'shutdown_request',
1021 content={'restart':restart},ident=t)
1021 content={'restart':restart},ident=t)
1022 error = False
1022 error = False
1023 if block or hub:
1023 if block or hub:
1024 self._flush_ignored_control()
1024 self._flush_ignored_control()
1025 for i in range(len(targets)):
1025 for i in range(len(targets)):
1026 idents,msg = self.session.recv(self._control_socket, 0)
1026 idents,msg = self.session.recv(self._control_socket, 0)
1027 if self.debug:
1027 if self.debug:
1028 pprint(msg)
1028 pprint(msg)
1029 if msg['content']['status'] != 'ok':
1029 if msg['content']['status'] != 'ok':
1030 error = self._unwrap_exception(msg['content'])
1030 error = self._unwrap_exception(msg['content'])
1031 else:
1031 else:
1032 self._ignored_control_replies += len(targets)
1032 self._ignored_control_replies += len(targets)
1033
1033
1034 if hub:
1034 if hub:
1035 time.sleep(0.25)
1035 time.sleep(0.25)
1036 self.session.send(self._query_socket, 'shutdown_request')
1036 self.session.send(self._query_socket, 'shutdown_request')
1037 idents,msg = self.session.recv(self._query_socket, 0)
1037 idents,msg = self.session.recv(self._query_socket, 0)
1038 if self.debug:
1038 if self.debug:
1039 pprint(msg)
1039 pprint(msg)
1040 if msg['content']['status'] != 'ok':
1040 if msg['content']['status'] != 'ok':
1041 error = self._unwrap_exception(msg['content'])
1041 error = self._unwrap_exception(msg['content'])
1042
1042
1043 if error:
1043 if error:
1044 raise error
1044 raise error
1045
1045
1046 #--------------------------------------------------------------------------
1046 #--------------------------------------------------------------------------
1047 # Execution related methods
1047 # Execution related methods
1048 #--------------------------------------------------------------------------
1048 #--------------------------------------------------------------------------
1049
1049
1050 def _maybe_raise(self, result):
1050 def _maybe_raise(self, result):
1051 """wrapper for maybe raising an exception if apply failed."""
1051 """wrapper for maybe raising an exception if apply failed."""
1052 if isinstance(result, error.RemoteError):
1052 if isinstance(result, error.RemoteError):
1053 raise result
1053 raise result
1054
1054
1055 return result
1055 return result
1056
1056
1057 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1057 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1058 ident=None):
1058 ident=None):
1059 """construct and send an apply message via a socket.
1059 """construct and send an apply message via a socket.
1060
1060
1061 This is the principal method with which all engine execution is performed by views.
1061 This is the principal method with which all engine execution is performed by views.
1062 """
1062 """
1063
1063
1064 assert not self._closed, "cannot use me anymore, I'm closed!"
1064 assert not self._closed, "cannot use me anymore, I'm closed!"
1065 # defaults:
1065 # defaults:
1066 args = args if args is not None else []
1066 args = args if args is not None else []
1067 kwargs = kwargs if kwargs is not None else {}
1067 kwargs = kwargs if kwargs is not None else {}
1068 subheader = subheader if subheader is not None else {}
1068 subheader = subheader if subheader is not None else {}
1069
1069
1070 # validate arguments
1070 # validate arguments
1071 if not callable(f) and not isinstance(f, Reference):
1071 if not callable(f) and not isinstance(f, Reference):
1072 raise TypeError("f must be callable, not %s"%type(f))
1072 raise TypeError("f must be callable, not %s"%type(f))
1073 if not isinstance(args, (tuple, list)):
1073 if not isinstance(args, (tuple, list)):
1074 raise TypeError("args must be tuple or list, not %s"%type(args))
1074 raise TypeError("args must be tuple or list, not %s"%type(args))
1075 if not isinstance(kwargs, dict):
1075 if not isinstance(kwargs, dict):
1076 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1076 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1077 if not isinstance(subheader, dict):
1077 if not isinstance(subheader, dict):
1078 raise TypeError("subheader must be dict, not %s"%type(subheader))
1078 raise TypeError("subheader must be dict, not %s"%type(subheader))
1079
1079
1080 bufs = util.pack_apply_message(f,args,kwargs)
1080 bufs = util.pack_apply_message(f,args,kwargs)
1081
1081
1082 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1082 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1083 subheader=subheader, track=track)
1083 subheader=subheader, track=track)
1084
1084
1085 msg_id = msg['header']['msg_id']
1085 msg_id = msg['header']['msg_id']
1086 self.outstanding.add(msg_id)
1086 self.outstanding.add(msg_id)
1087 if ident:
1087 if ident:
1088 # possibly routed to a specific engine
1088 # possibly routed to a specific engine
1089 if isinstance(ident, list):
1089 if isinstance(ident, list):
1090 ident = ident[-1]
1090 ident = ident[-1]
1091 if ident in self._engines.values():
1091 if ident in self._engines.values():
1092 # save for later, in case of engine death
1092 # save for later, in case of engine death
1093 self._outstanding_dict[ident].add(msg_id)
1093 self._outstanding_dict[ident].add(msg_id)
1094 self.history.append(msg_id)
1094 self.history.append(msg_id)
1095 self.metadata[msg_id]['submitted'] = datetime.now()
1095 self.metadata[msg_id]['submitted'] = datetime.now()
1096
1096
1097 return msg
1097 return msg
1098
1098
1099 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1099 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1100 """construct and send an execute request via a socket.
1100 """construct and send an execute request via a socket.
1101
1101
1102 """
1102 """
1103
1103
1104 assert not self._closed, "cannot use me anymore, I'm closed!"
1104 assert not self._closed, "cannot use me anymore, I'm closed!"
1105 # defaults:
1105 # defaults:
1106 subheader = subheader if subheader is not None else {}
1106 subheader = subheader if subheader is not None else {}
1107
1107
1108 # validate arguments
1108 # validate arguments
1109 if not isinstance(code, basestring):
1109 if not isinstance(code, basestring):
1110 raise TypeError("code must be text, not %s" % type(code))
1110 raise TypeError("code must be text, not %s" % type(code))
1111 if not isinstance(subheader, dict):
1111 if not isinstance(subheader, dict):
1112 raise TypeError("subheader must be dict, not %s" % type(subheader))
1112 raise TypeError("subheader must be dict, not %s" % type(subheader))
1113
1113
1114 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1114 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1115
1115
1116
1116
1117 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1117 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1118 subheader=subheader)
1118 subheader=subheader)
1119
1119
1120 msg_id = msg['header']['msg_id']
1120 msg_id = msg['header']['msg_id']
1121 self.outstanding.add(msg_id)
1121 self.outstanding.add(msg_id)
1122 if ident:
1122 if ident:
1123 # possibly routed to a specific engine
1123 # possibly routed to a specific engine
1124 if isinstance(ident, list):
1124 if isinstance(ident, list):
1125 ident = ident[-1]
1125 ident = ident[-1]
1126 if ident in self._engines.values():
1126 if ident in self._engines.values():
1127 # save for later, in case of engine death
1127 # save for later, in case of engine death
1128 self._outstanding_dict[ident].add(msg_id)
1128 self._outstanding_dict[ident].add(msg_id)
1129 self.history.append(msg_id)
1129 self.history.append(msg_id)
1130 self.metadata[msg_id]['submitted'] = datetime.now()
1130 self.metadata[msg_id]['submitted'] = datetime.now()
1131
1131
1132 return msg
1132 return msg
1133
1133
1134 #--------------------------------------------------------------------------
1134 #--------------------------------------------------------------------------
1135 # construct a View object
1135 # construct a View object
1136 #--------------------------------------------------------------------------
1136 #--------------------------------------------------------------------------
1137
1137
1138 def load_balanced_view(self, targets=None):
1138 def load_balanced_view(self, targets=None):
1139 """construct a DirectView object.
1139 """construct a DirectView object.
1140
1140
1141 If no arguments are specified, create a LoadBalancedView
1141 If no arguments are specified, create a LoadBalancedView
1142 using all engines.
1142 using all engines.
1143
1143
1144 Parameters
1144 Parameters
1145 ----------
1145 ----------
1146
1146
1147 targets: list,slice,int,etc. [default: use all engines]
1147 targets: list,slice,int,etc. [default: use all engines]
1148 The subset of engines across which to load-balance
1148 The subset of engines across which to load-balance
1149 """
1149 """
1150 if targets == 'all':
1150 if targets == 'all':
1151 targets = None
1151 targets = None
1152 if targets is not None:
1152 if targets is not None:
1153 targets = self._build_targets(targets)[1]
1153 targets = self._build_targets(targets)[1]
1154 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1154 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1155
1155
1156 def direct_view(self, targets='all'):
1156 def direct_view(self, targets='all'):
1157 """construct a DirectView object.
1157 """construct a DirectView object.
1158
1158
1159 If no targets are specified, create a DirectView using all engines.
1159 If no targets are specified, create a DirectView using all engines.
1160
1160
1161 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1161 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1162 evaluate the target engines at each execution, whereas rc[:] will connect to
1162 evaluate the target engines at each execution, whereas rc[:] will connect to
1163 all *current* engines, and that list will not change.
1163 all *current* engines, and that list will not change.
1164
1164
1165 That is, 'all' will always use all engines, whereas rc[:] will not use
1165 That is, 'all' will always use all engines, whereas rc[:] will not use
1166 engines added after the DirectView is constructed.
1166 engines added after the DirectView is constructed.
1167
1167
1168 Parameters
1168 Parameters
1169 ----------
1169 ----------
1170
1170
1171 targets: list,slice,int,etc. [default: use all engines]
1171 targets: list,slice,int,etc. [default: use all engines]
1172 The engines to use for the View
1172 The engines to use for the View
1173 """
1173 """
1174 single = isinstance(targets, int)
1174 single = isinstance(targets, int)
1175 # allow 'all' to be lazily evaluated at each execution
1175 # allow 'all' to be lazily evaluated at each execution
1176 if targets != 'all':
1176 if targets != 'all':
1177 targets = self._build_targets(targets)[1]
1177 targets = self._build_targets(targets)[1]
1178 if single:
1178 if single:
1179 targets = targets[0]
1179 targets = targets[0]
1180 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1180 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1181
1181
1182 #--------------------------------------------------------------------------
1182 #--------------------------------------------------------------------------
1183 # Query methods
1183 # Query methods
1184 #--------------------------------------------------------------------------
1184 #--------------------------------------------------------------------------
1185
1185
1186 @spin_first
1186 @spin_first
1187 def get_result(self, indices_or_msg_ids=None, block=None):
1187 def get_result(self, indices_or_msg_ids=None, block=None):
1188 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1188 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1189
1189
1190 If the client already has the results, no request to the Hub will be made.
1190 If the client already has the results, no request to the Hub will be made.
1191
1191
1192 This is a convenient way to construct AsyncResult objects, which are wrappers
1192 This is a convenient way to construct AsyncResult objects, which are wrappers
1193 that include metadata about execution, and allow for awaiting results that
1193 that include metadata about execution, and allow for awaiting results that
1194 were not submitted by this Client.
1194 were not submitted by this Client.
1195
1195
1196 It can also be a convenient way to retrieve the metadata associated with
1196 It can also be a convenient way to retrieve the metadata associated with
1197 blocking execution, since it always retrieves
1197 blocking execution, since it always retrieves
1198
1198
1199 Examples
1199 Examples
1200 --------
1200 --------
1201 ::
1201 ::
1202
1202
1203 In [10]: r = client.apply()
1203 In [10]: r = client.apply()
1204
1204
1205 Parameters
1205 Parameters
1206 ----------
1206 ----------
1207
1207
1208 indices_or_msg_ids : integer history index, str msg_id, or list of either
1208 indices_or_msg_ids : integer history index, str msg_id, or list of either
1209 The indices or msg_ids of indices to be retrieved
1209 The indices or msg_ids of indices to be retrieved
1210
1210
1211 block : bool
1211 block : bool
1212 Whether to wait for the result to be done
1212 Whether to wait for the result to be done
1213
1213
1214 Returns
1214 Returns
1215 -------
1215 -------
1216
1216
1217 AsyncResult
1217 AsyncResult
1218 A single AsyncResult object will always be returned.
1218 A single AsyncResult object will always be returned.
1219
1219
1220 AsyncHubResult
1220 AsyncHubResult
1221 A subclass of AsyncResult that retrieves results from the Hub
1221 A subclass of AsyncResult that retrieves results from the Hub
1222
1222
1223 """
1223 """
1224 block = self.block if block is None else block
1224 block = self.block if block is None else block
1225 if indices_or_msg_ids is None:
1225 if indices_or_msg_ids is None:
1226 indices_or_msg_ids = -1
1226 indices_or_msg_ids = -1
1227
1227
1228 if not isinstance(indices_or_msg_ids, (list,tuple)):
1228 if not isinstance(indices_or_msg_ids, (list,tuple)):
1229 indices_or_msg_ids = [indices_or_msg_ids]
1229 indices_or_msg_ids = [indices_or_msg_ids]
1230
1230
1231 theids = []
1231 theids = []
1232 for id in indices_or_msg_ids:
1232 for id in indices_or_msg_ids:
1233 if isinstance(id, int):
1233 if isinstance(id, int):
1234 id = self.history[id]
1234 id = self.history[id]
1235 if not isinstance(id, basestring):
1235 if not isinstance(id, basestring):
1236 raise TypeError("indices must be str or int, not %r"%id)
1236 raise TypeError("indices must be str or int, not %r"%id)
1237 theids.append(id)
1237 theids.append(id)
1238
1238
1239 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1239 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1240 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1240 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1241
1241
1242 if remote_ids:
1242 if remote_ids:
1243 ar = AsyncHubResult(self, msg_ids=theids)
1243 ar = AsyncHubResult(self, msg_ids=theids)
1244 else:
1244 else:
1245 ar = AsyncResult(self, msg_ids=theids)
1245 ar = AsyncResult(self, msg_ids=theids)
1246
1246
1247 if block:
1247 if block:
1248 ar.wait()
1248 ar.wait()
1249
1249
1250 return ar
1250 return ar
1251
1251
1252 @spin_first
1252 @spin_first
1253 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1253 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1254 """Resubmit one or more tasks.
1254 """Resubmit one or more tasks.
1255
1255
1256 in-flight tasks may not be resubmitted.
1256 in-flight tasks may not be resubmitted.
1257
1257
1258 Parameters
1258 Parameters
1259 ----------
1259 ----------
1260
1260
1261 indices_or_msg_ids : integer history index, str msg_id, or list of either
1261 indices_or_msg_ids : integer history index, str msg_id, or list of either
1262 The indices or msg_ids of indices to be retrieved
1262 The indices or msg_ids of indices to be retrieved
1263
1263
1264 block : bool
1264 block : bool
1265 Whether to wait for the result to be done
1265 Whether to wait for the result to be done
1266
1266
1267 Returns
1267 Returns
1268 -------
1268 -------
1269
1269
1270 AsyncHubResult
1270 AsyncHubResult
1271 A subclass of AsyncResult that retrieves results from the Hub
1271 A subclass of AsyncResult that retrieves results from the Hub
1272
1272
1273 """
1273 """
1274 block = self.block if block is None else block
1274 block = self.block if block is None else block
1275 if indices_or_msg_ids is None:
1275 if indices_or_msg_ids is None:
1276 indices_or_msg_ids = -1
1276 indices_or_msg_ids = -1
1277
1277
1278 if not isinstance(indices_or_msg_ids, (list,tuple)):
1278 if not isinstance(indices_or_msg_ids, (list,tuple)):
1279 indices_or_msg_ids = [indices_or_msg_ids]
1279 indices_or_msg_ids = [indices_or_msg_ids]
1280
1280
1281 theids = []
1281 theids = []
1282 for id in indices_or_msg_ids:
1282 for id in indices_or_msg_ids:
1283 if isinstance(id, int):
1283 if isinstance(id, int):
1284 id = self.history[id]
1284 id = self.history[id]
1285 if not isinstance(id, basestring):
1285 if not isinstance(id, basestring):
1286 raise TypeError("indices must be str or int, not %r"%id)
1286 raise TypeError("indices must be str or int, not %r"%id)
1287 theids.append(id)
1287 theids.append(id)
1288
1288
1289 for msg_id in theids:
1290 self.outstanding.discard(msg_id)
1291 if msg_id in self.history:
1292 self.history.remove(msg_id)
1293 self.results.pop(msg_id, None)
1294 self.metadata.pop(msg_id, None)
1295 content = dict(msg_ids = theids)
1289 content = dict(msg_ids = theids)
1296
1290
1297 self.session.send(self._query_socket, 'resubmit_request', content)
1291 self.session.send(self._query_socket, 'resubmit_request', content)
1298
1292
1299 zmq.select([self._query_socket], [], [])
1293 zmq.select([self._query_socket], [], [])
1300 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1294 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1301 if self.debug:
1295 if self.debug:
1302 pprint(msg)
1296 pprint(msg)
1303 content = msg['content']
1297 content = msg['content']
1304 if content['status'] != 'ok':
1298 if content['status'] != 'ok':
1305 raise self._unwrap_exception(content)
1299 raise self._unwrap_exception(content)
1300 mapping = content['resubmitted']
1301 new_ids = [ mapping[msg_id] for msg_id in theids ]
1306
1302
1307 ar = AsyncHubResult(self, msg_ids=theids)
1303 ar = AsyncHubResult(self, msg_ids=new_ids)
1308
1304
1309 if block:
1305 if block:
1310 ar.wait()
1306 ar.wait()
1311
1307
1312 return ar
1308 return ar
1313
1309
1314 @spin_first
1310 @spin_first
1315 def result_status(self, msg_ids, status_only=True):
1311 def result_status(self, msg_ids, status_only=True):
1316 """Check on the status of the result(s) of the apply request with `msg_ids`.
1312 """Check on the status of the result(s) of the apply request with `msg_ids`.
1317
1313
1318 If status_only is False, then the actual results will be retrieved, else
1314 If status_only is False, then the actual results will be retrieved, else
1319 only the status of the results will be checked.
1315 only the status of the results will be checked.
1320
1316
1321 Parameters
1317 Parameters
1322 ----------
1318 ----------
1323
1319
1324 msg_ids : list of msg_ids
1320 msg_ids : list of msg_ids
1325 if int:
1321 if int:
1326 Passed as index to self.history for convenience.
1322 Passed as index to self.history for convenience.
1327 status_only : bool (default: True)
1323 status_only : bool (default: True)
1328 if False:
1324 if False:
1329 Retrieve the actual results of completed tasks.
1325 Retrieve the actual results of completed tasks.
1330
1326
1331 Returns
1327 Returns
1332 -------
1328 -------
1333
1329
1334 results : dict
1330 results : dict
1335 There will always be the keys 'pending' and 'completed', which will
1331 There will always be the keys 'pending' and 'completed', which will
1336 be lists of msg_ids that are incomplete or complete. If `status_only`
1332 be lists of msg_ids that are incomplete or complete. If `status_only`
1337 is False, then completed results will be keyed by their `msg_id`.
1333 is False, then completed results will be keyed by their `msg_id`.
1338 """
1334 """
1339 if not isinstance(msg_ids, (list,tuple)):
1335 if not isinstance(msg_ids, (list,tuple)):
1340 msg_ids = [msg_ids]
1336 msg_ids = [msg_ids]
1341
1337
1342 theids = []
1338 theids = []
1343 for msg_id in msg_ids:
1339 for msg_id in msg_ids:
1344 if isinstance(msg_id, int):
1340 if isinstance(msg_id, int):
1345 msg_id = self.history[msg_id]
1341 msg_id = self.history[msg_id]
1346 if not isinstance(msg_id, basestring):
1342 if not isinstance(msg_id, basestring):
1347 raise TypeError("msg_ids must be str, not %r"%msg_id)
1343 raise TypeError("msg_ids must be str, not %r"%msg_id)
1348 theids.append(msg_id)
1344 theids.append(msg_id)
1349
1345
1350 completed = []
1346 completed = []
1351 local_results = {}
1347 local_results = {}
1352
1348
1353 # comment this block out to temporarily disable local shortcut:
1349 # comment this block out to temporarily disable local shortcut:
1354 for msg_id in theids:
1350 for msg_id in theids:
1355 if msg_id in self.results:
1351 if msg_id in self.results:
1356 completed.append(msg_id)
1352 completed.append(msg_id)
1357 local_results[msg_id] = self.results[msg_id]
1353 local_results[msg_id] = self.results[msg_id]
1358 theids.remove(msg_id)
1354 theids.remove(msg_id)
1359
1355
1360 if theids: # some not locally cached
1356 if theids: # some not locally cached
1361 content = dict(msg_ids=theids, status_only=status_only)
1357 content = dict(msg_ids=theids, status_only=status_only)
1362 msg = self.session.send(self._query_socket, "result_request", content=content)
1358 msg = self.session.send(self._query_socket, "result_request", content=content)
1363 zmq.select([self._query_socket], [], [])
1359 zmq.select([self._query_socket], [], [])
1364 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1360 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1365 if self.debug:
1361 if self.debug:
1366 pprint(msg)
1362 pprint(msg)
1367 content = msg['content']
1363 content = msg['content']
1368 if content['status'] != 'ok':
1364 if content['status'] != 'ok':
1369 raise self._unwrap_exception(content)
1365 raise self._unwrap_exception(content)
1370 buffers = msg['buffers']
1366 buffers = msg['buffers']
1371 else:
1367 else:
1372 content = dict(completed=[],pending=[])
1368 content = dict(completed=[],pending=[])
1373
1369
1374 content['completed'].extend(completed)
1370 content['completed'].extend(completed)
1375
1371
1376 if status_only:
1372 if status_only:
1377 return content
1373 return content
1378
1374
1379 failures = []
1375 failures = []
1380 # load cached results into result:
1376 # load cached results into result:
1381 content.update(local_results)
1377 content.update(local_results)
1382
1378
1383 # update cache with results:
1379 # update cache with results:
1384 for msg_id in sorted(theids):
1380 for msg_id in sorted(theids):
1385 if msg_id in content['completed']:
1381 if msg_id in content['completed']:
1386 rec = content[msg_id]
1382 rec = content[msg_id]
1387 parent = rec['header']
1383 parent = rec['header']
1388 header = rec['result_header']
1384 header = rec['result_header']
1389 rcontent = rec['result_content']
1385 rcontent = rec['result_content']
1390 iodict = rec['io']
1386 iodict = rec['io']
1391 if isinstance(rcontent, str):
1387 if isinstance(rcontent, str):
1392 rcontent = self.session.unpack(rcontent)
1388 rcontent = self.session.unpack(rcontent)
1393
1389
1394 md = self.metadata[msg_id]
1390 md = self.metadata[msg_id]
1395 md.update(self._extract_metadata(header, parent, rcontent))
1391 md.update(self._extract_metadata(header, parent, rcontent))
1396 if rec.get('received'):
1392 if rec.get('received'):
1397 md['received'] = rec['received']
1393 md['received'] = rec['received']
1398 md.update(iodict)
1394 md.update(iodict)
1399
1395
1400 if rcontent['status'] == 'ok':
1396 if rcontent['status'] == 'ok':
1401 res,buffers = util.unserialize_object(buffers)
1397 res,buffers = util.unserialize_object(buffers)
1402 else:
1398 else:
1403 print rcontent
1399 print rcontent
1404 res = self._unwrap_exception(rcontent)
1400 res = self._unwrap_exception(rcontent)
1405 failures.append(res)
1401 failures.append(res)
1406
1402
1407 self.results[msg_id] = res
1403 self.results[msg_id] = res
1408 content[msg_id] = res
1404 content[msg_id] = res
1409
1405
1410 if len(theids) == 1 and failures:
1406 if len(theids) == 1 and failures:
1411 raise failures[0]
1407 raise failures[0]
1412
1408
1413 error.collect_exceptions(failures, "result_status")
1409 error.collect_exceptions(failures, "result_status")
1414 return content
1410 return content
1415
1411
1416 @spin_first
1412 @spin_first
1417 def queue_status(self, targets='all', verbose=False):
1413 def queue_status(self, targets='all', verbose=False):
1418 """Fetch the status of engine queues.
1414 """Fetch the status of engine queues.
1419
1415
1420 Parameters
1416 Parameters
1421 ----------
1417 ----------
1422
1418
1423 targets : int/str/list of ints/strs
1419 targets : int/str/list of ints/strs
1424 the engines whose states are to be queried.
1420 the engines whose states are to be queried.
1425 default : all
1421 default : all
1426 verbose : bool
1422 verbose : bool
1427 Whether to return lengths only, or lists of ids for each element
1423 Whether to return lengths only, or lists of ids for each element
1428 """
1424 """
1429 if targets == 'all':
1425 if targets == 'all':
1430 # allow 'all' to be evaluated on the engine
1426 # allow 'all' to be evaluated on the engine
1431 engine_ids = None
1427 engine_ids = None
1432 else:
1428 else:
1433 engine_ids = self._build_targets(targets)[1]
1429 engine_ids = self._build_targets(targets)[1]
1434 content = dict(targets=engine_ids, verbose=verbose)
1430 content = dict(targets=engine_ids, verbose=verbose)
1435 self.session.send(self._query_socket, "queue_request", content=content)
1431 self.session.send(self._query_socket, "queue_request", content=content)
1436 idents,msg = self.session.recv(self._query_socket, 0)
1432 idents,msg = self.session.recv(self._query_socket, 0)
1437 if self.debug:
1433 if self.debug:
1438 pprint(msg)
1434 pprint(msg)
1439 content = msg['content']
1435 content = msg['content']
1440 status = content.pop('status')
1436 status = content.pop('status')
1441 if status != 'ok':
1437 if status != 'ok':
1442 raise self._unwrap_exception(content)
1438 raise self._unwrap_exception(content)
1443 content = rekey(content)
1439 content = rekey(content)
1444 if isinstance(targets, int):
1440 if isinstance(targets, int):
1445 return content[targets]
1441 return content[targets]
1446 else:
1442 else:
1447 return content
1443 return content
1448
1444
1449 @spin_first
1445 @spin_first
1450 def purge_results(self, jobs=[], targets=[]):
1446 def purge_results(self, jobs=[], targets=[]):
1451 """Tell the Hub to forget results.
1447 """Tell the Hub to forget results.
1452
1448
1453 Individual results can be purged by msg_id, or the entire
1449 Individual results can be purged by msg_id, or the entire
1454 history of specific targets can be purged.
1450 history of specific targets can be purged.
1455
1451
1456 Use `purge_results('all')` to scrub everything from the Hub's db.
1452 Use `purge_results('all')` to scrub everything from the Hub's db.
1457
1453
1458 Parameters
1454 Parameters
1459 ----------
1455 ----------
1460
1456
1461 jobs : str or list of str or AsyncResult objects
1457 jobs : str or list of str or AsyncResult objects
1462 the msg_ids whose results should be forgotten.
1458 the msg_ids whose results should be forgotten.
1463 targets : int/str/list of ints/strs
1459 targets : int/str/list of ints/strs
1464 The targets, by int_id, whose entire history is to be purged.
1460 The targets, by int_id, whose entire history is to be purged.
1465
1461
1466 default : None
1462 default : None
1467 """
1463 """
1468 if not targets and not jobs:
1464 if not targets and not jobs:
1469 raise ValueError("Must specify at least one of `targets` and `jobs`")
1465 raise ValueError("Must specify at least one of `targets` and `jobs`")
1470 if targets:
1466 if targets:
1471 targets = self._build_targets(targets)[1]
1467 targets = self._build_targets(targets)[1]
1472
1468
1473 # construct msg_ids from jobs
1469 # construct msg_ids from jobs
1474 if jobs == 'all':
1470 if jobs == 'all':
1475 msg_ids = jobs
1471 msg_ids = jobs
1476 else:
1472 else:
1477 msg_ids = []
1473 msg_ids = []
1478 if isinstance(jobs, (basestring,AsyncResult)):
1474 if isinstance(jobs, (basestring,AsyncResult)):
1479 jobs = [jobs]
1475 jobs = [jobs]
1480 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1476 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1481 if bad_ids:
1477 if bad_ids:
1482 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1478 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1483 for j in jobs:
1479 for j in jobs:
1484 if isinstance(j, AsyncResult):
1480 if isinstance(j, AsyncResult):
1485 msg_ids.extend(j.msg_ids)
1481 msg_ids.extend(j.msg_ids)
1486 else:
1482 else:
1487 msg_ids.append(j)
1483 msg_ids.append(j)
1488
1484
1489 content = dict(engine_ids=targets, msg_ids=msg_ids)
1485 content = dict(engine_ids=targets, msg_ids=msg_ids)
1490 self.session.send(self._query_socket, "purge_request", content=content)
1486 self.session.send(self._query_socket, "purge_request", content=content)
1491 idents, msg = self.session.recv(self._query_socket, 0)
1487 idents, msg = self.session.recv(self._query_socket, 0)
1492 if self.debug:
1488 if self.debug:
1493 pprint(msg)
1489 pprint(msg)
1494 content = msg['content']
1490 content = msg['content']
1495 if content['status'] != 'ok':
1491 if content['status'] != 'ok':
1496 raise self._unwrap_exception(content)
1492 raise self._unwrap_exception(content)
1497
1493
1498 @spin_first
1494 @spin_first
1499 def hub_history(self):
1495 def hub_history(self):
1500 """Get the Hub's history
1496 """Get the Hub's history
1501
1497
1502 Just like the Client, the Hub has a history, which is a list of msg_ids.
1498 Just like the Client, the Hub has a history, which is a list of msg_ids.
1503 This will contain the history of all clients, and, depending on configuration,
1499 This will contain the history of all clients, and, depending on configuration,
1504 may contain history across multiple cluster sessions.
1500 may contain history across multiple cluster sessions.
1505
1501
1506 Any msg_id returned here is a valid argument to `get_result`.
1502 Any msg_id returned here is a valid argument to `get_result`.
1507
1503
1508 Returns
1504 Returns
1509 -------
1505 -------
1510
1506
1511 msg_ids : list of strs
1507 msg_ids : list of strs
1512 list of all msg_ids, ordered by task submission time.
1508 list of all msg_ids, ordered by task submission time.
1513 """
1509 """
1514
1510
1515 self.session.send(self._query_socket, "history_request", content={})
1511 self.session.send(self._query_socket, "history_request", content={})
1516 idents, msg = self.session.recv(self._query_socket, 0)
1512 idents, msg = self.session.recv(self._query_socket, 0)
1517
1513
1518 if self.debug:
1514 if self.debug:
1519 pprint(msg)
1515 pprint(msg)
1520 content = msg['content']
1516 content = msg['content']
1521 if content['status'] != 'ok':
1517 if content['status'] != 'ok':
1522 raise self._unwrap_exception(content)
1518 raise self._unwrap_exception(content)
1523 else:
1519 else:
1524 return content['history']
1520 return content['history']
1525
1521
1526 @spin_first
1522 @spin_first
1527 def db_query(self, query, keys=None):
1523 def db_query(self, query, keys=None):
1528 """Query the Hub's TaskRecord database
1524 """Query the Hub's TaskRecord database
1529
1525
1530 This will return a list of task record dicts that match `query`
1526 This will return a list of task record dicts that match `query`
1531
1527
1532 Parameters
1528 Parameters
1533 ----------
1529 ----------
1534
1530
1535 query : mongodb query dict
1531 query : mongodb query dict
1536 The search dict. See mongodb query docs for details.
1532 The search dict. See mongodb query docs for details.
1537 keys : list of strs [optional]
1533 keys : list of strs [optional]
1538 The subset of keys to be returned. The default is to fetch everything but buffers.
1534 The subset of keys to be returned. The default is to fetch everything but buffers.
1539 'msg_id' will *always* be included.
1535 'msg_id' will *always* be included.
1540 """
1536 """
1541 if isinstance(keys, basestring):
1537 if isinstance(keys, basestring):
1542 keys = [keys]
1538 keys = [keys]
1543 content = dict(query=query, keys=keys)
1539 content = dict(query=query, keys=keys)
1544 self.session.send(self._query_socket, "db_request", content=content)
1540 self.session.send(self._query_socket, "db_request", content=content)
1545 idents, msg = self.session.recv(self._query_socket, 0)
1541 idents, msg = self.session.recv(self._query_socket, 0)
1546 if self.debug:
1542 if self.debug:
1547 pprint(msg)
1543 pprint(msg)
1548 content = msg['content']
1544 content = msg['content']
1549 if content['status'] != 'ok':
1545 if content['status'] != 'ok':
1550 raise self._unwrap_exception(content)
1546 raise self._unwrap_exception(content)
1551
1547
1552 records = content['records']
1548 records = content['records']
1553
1549
1554 buffer_lens = content['buffer_lens']
1550 buffer_lens = content['buffer_lens']
1555 result_buffer_lens = content['result_buffer_lens']
1551 result_buffer_lens = content['result_buffer_lens']
1556 buffers = msg['buffers']
1552 buffers = msg['buffers']
1557 has_bufs = buffer_lens is not None
1553 has_bufs = buffer_lens is not None
1558 has_rbufs = result_buffer_lens is not None
1554 has_rbufs = result_buffer_lens is not None
1559 for i,rec in enumerate(records):
1555 for i,rec in enumerate(records):
1560 # relink buffers
1556 # relink buffers
1561 if has_bufs:
1557 if has_bufs:
1562 blen = buffer_lens[i]
1558 blen = buffer_lens[i]
1563 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1559 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1564 if has_rbufs:
1560 if has_rbufs:
1565 blen = result_buffer_lens[i]
1561 blen = result_buffer_lens[i]
1566 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1562 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1567
1563
1568 return records
1564 return records
1569
1565
1570 __all__ = [ 'Client' ]
1566 __all__ = [ 'Client' ]
@@ -1,1304 +1,1310 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import sys
21 import sys
22 import time
22 import time
23 from datetime import datetime
23 from datetime import datetime
24
24
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27 from zmq.eventloop.zmqstream import ZMQStream
27 from zmq.eventloop.zmqstream import ZMQStream
28
28
29 # internal:
29 # internal:
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31 from IPython.utils.py3compat import cast_bytes
31 from IPython.utils.py3compat import cast_bytes
32 from IPython.utils.traitlets import (
32 from IPython.utils.traitlets import (
33 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
34 )
34 )
35
35
36 from IPython.parallel import error, util
36 from IPython.parallel import error, util
37 from IPython.parallel.factory import RegistrationFactory
37 from IPython.parallel.factory import RegistrationFactory
38
38
39 from IPython.zmq.session import SessionFactory
39 from IPython.zmq.session import SessionFactory
40
40
41 from .heartmonitor import HeartMonitor
41 from .heartmonitor import HeartMonitor
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Code
44 # Code
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 def _passer(*args, **kwargs):
47 def _passer(*args, **kwargs):
48 return
48 return
49
49
50 def _printer(*args, **kwargs):
50 def _printer(*args, **kwargs):
51 print (args)
51 print (args)
52 print (kwargs)
52 print (kwargs)
53
53
54 def empty_record():
54 def empty_record():
55 """Return an empty dict with all record keys."""
55 """Return an empty dict with all record keys."""
56 return {
56 return {
57 'msg_id' : None,
57 'msg_id' : None,
58 'header' : None,
58 'header' : None,
59 'content': None,
59 'content': None,
60 'buffers': None,
60 'buffers': None,
61 'submitted': None,
61 'submitted': None,
62 'client_uuid' : None,
62 'client_uuid' : None,
63 'engine_uuid' : None,
63 'engine_uuid' : None,
64 'started': None,
64 'started': None,
65 'completed': None,
65 'completed': None,
66 'resubmitted': None,
66 'resubmitted': None,
67 'received': None,
67 'received': None,
68 'result_header' : None,
68 'result_header' : None,
69 'result_content' : None,
69 'result_content' : None,
70 'result_buffers' : None,
70 'result_buffers' : None,
71 'queue' : None,
71 'queue' : None,
72 'pyin' : None,
72 'pyin' : None,
73 'pyout': None,
73 'pyout': None,
74 'pyerr': None,
74 'pyerr': None,
75 'stdout': '',
75 'stdout': '',
76 'stderr': '',
76 'stderr': '',
77 }
77 }
78
78
79 def init_record(msg):
79 def init_record(msg):
80 """Initialize a TaskRecord based on a request."""
80 """Initialize a TaskRecord based on a request."""
81 header = msg['header']
81 header = msg['header']
82 return {
82 return {
83 'msg_id' : header['msg_id'],
83 'msg_id' : header['msg_id'],
84 'header' : header,
84 'header' : header,
85 'content': msg['content'],
85 'content': msg['content'],
86 'buffers': msg['buffers'],
86 'buffers': msg['buffers'],
87 'submitted': header['date'],
87 'submitted': header['date'],
88 'client_uuid' : None,
88 'client_uuid' : None,
89 'engine_uuid' : None,
89 'engine_uuid' : None,
90 'started': None,
90 'started': None,
91 'completed': None,
91 'completed': None,
92 'resubmitted': None,
92 'resubmitted': None,
93 'received': None,
93 'received': None,
94 'result_header' : None,
94 'result_header' : None,
95 'result_content' : None,
95 'result_content' : None,
96 'result_buffers' : None,
96 'result_buffers' : None,
97 'queue' : None,
97 'queue' : None,
98 'pyin' : None,
98 'pyin' : None,
99 'pyout': None,
99 'pyout': None,
100 'pyerr': None,
100 'pyerr': None,
101 'stdout': '',
101 'stdout': '',
102 'stderr': '',
102 'stderr': '',
103 }
103 }
104
104
105
105
106 class EngineConnector(HasTraits):
106 class EngineConnector(HasTraits):
107 """A simple object for accessing the various zmq connections of an object.
107 """A simple object for accessing the various zmq connections of an object.
108 Attributes are:
108 Attributes are:
109 id (int): engine ID
109 id (int): engine ID
110 uuid (str): uuid (unused?)
110 uuid (str): uuid (unused?)
111 queue (str): identity of queue's XREQ socket
111 queue (str): identity of queue's XREQ socket
112 registration (str): identity of registration XREQ socket
112 registration (str): identity of registration XREQ socket
113 heartbeat (str): identity of heartbeat XREQ socket
113 heartbeat (str): identity of heartbeat XREQ socket
114 """
114 """
115 id=Integer(0)
115 id=Integer(0)
116 queue=CBytes()
116 queue=CBytes()
117 control=CBytes()
117 control=CBytes()
118 registration=CBytes()
118 registration=CBytes()
119 heartbeat=CBytes()
119 heartbeat=CBytes()
120 pending=Set()
120 pending=Set()
121
121
122 class HubFactory(RegistrationFactory):
122 class HubFactory(RegistrationFactory):
123 """The Configurable for setting up a Hub."""
123 """The Configurable for setting up a Hub."""
124
124
125 # port-pairs for monitoredqueues:
125 # port-pairs for monitoredqueues:
126 hb = Tuple(Integer,Integer,config=True,
126 hb = Tuple(Integer,Integer,config=True,
127 help="""XREQ/SUB Port pair for Engine heartbeats""")
127 help="""XREQ/SUB Port pair for Engine heartbeats""")
128 def _hb_default(self):
128 def _hb_default(self):
129 return tuple(util.select_random_ports(2))
129 return tuple(util.select_random_ports(2))
130
130
131 mux = Tuple(Integer,Integer,config=True,
131 mux = Tuple(Integer,Integer,config=True,
132 help="""Engine/Client Port pair for MUX queue""")
132 help="""Engine/Client Port pair for MUX queue""")
133
133
134 def _mux_default(self):
134 def _mux_default(self):
135 return tuple(util.select_random_ports(2))
135 return tuple(util.select_random_ports(2))
136
136
137 task = Tuple(Integer,Integer,config=True,
137 task = Tuple(Integer,Integer,config=True,
138 help="""Engine/Client Port pair for Task queue""")
138 help="""Engine/Client Port pair for Task queue""")
139 def _task_default(self):
139 def _task_default(self):
140 return tuple(util.select_random_ports(2))
140 return tuple(util.select_random_ports(2))
141
141
142 control = Tuple(Integer,Integer,config=True,
142 control = Tuple(Integer,Integer,config=True,
143 help="""Engine/Client Port pair for Control queue""")
143 help="""Engine/Client Port pair for Control queue""")
144
144
145 def _control_default(self):
145 def _control_default(self):
146 return tuple(util.select_random_ports(2))
146 return tuple(util.select_random_ports(2))
147
147
148 iopub = Tuple(Integer,Integer,config=True,
148 iopub = Tuple(Integer,Integer,config=True,
149 help="""Engine/Client Port pair for IOPub relay""")
149 help="""Engine/Client Port pair for IOPub relay""")
150
150
151 def _iopub_default(self):
151 def _iopub_default(self):
152 return tuple(util.select_random_ports(2))
152 return tuple(util.select_random_ports(2))
153
153
154 # single ports:
154 # single ports:
155 mon_port = Integer(config=True,
155 mon_port = Integer(config=True,
156 help="""Monitor (SUB) port for queue traffic""")
156 help="""Monitor (SUB) port for queue traffic""")
157
157
158 def _mon_port_default(self):
158 def _mon_port_default(self):
159 return util.select_random_ports(1)[0]
159 return util.select_random_ports(1)[0]
160
160
161 notifier_port = Integer(config=True,
161 notifier_port = Integer(config=True,
162 help="""PUB port for sending engine status notifications""")
162 help="""PUB port for sending engine status notifications""")
163
163
164 def _notifier_port_default(self):
164 def _notifier_port_default(self):
165 return util.select_random_ports(1)[0]
165 return util.select_random_ports(1)[0]
166
166
167 engine_ip = Unicode('127.0.0.1', config=True,
167 engine_ip = Unicode('127.0.0.1', config=True,
168 help="IP on which to listen for engine connections. [default: loopback]")
168 help="IP on which to listen for engine connections. [default: loopback]")
169 engine_transport = Unicode('tcp', config=True,
169 engine_transport = Unicode('tcp', config=True,
170 help="0MQ transport for engine connections. [default: tcp]")
170 help="0MQ transport for engine connections. [default: tcp]")
171
171
172 client_ip = Unicode('127.0.0.1', config=True,
172 client_ip = Unicode('127.0.0.1', config=True,
173 help="IP on which to listen for client connections. [default: loopback]")
173 help="IP on which to listen for client connections. [default: loopback]")
174 client_transport = Unicode('tcp', config=True,
174 client_transport = Unicode('tcp', config=True,
175 help="0MQ transport for client connections. [default : tcp]")
175 help="0MQ transport for client connections. [default : tcp]")
176
176
177 monitor_ip = Unicode('127.0.0.1', config=True,
177 monitor_ip = Unicode('127.0.0.1', config=True,
178 help="IP on which to listen for monitor messages. [default: loopback]")
178 help="IP on which to listen for monitor messages. [default: loopback]")
179 monitor_transport = Unicode('tcp', config=True,
179 monitor_transport = Unicode('tcp', config=True,
180 help="0MQ transport for monitor messages. [default : tcp]")
180 help="0MQ transport for monitor messages. [default : tcp]")
181
181
182 monitor_url = Unicode('')
182 monitor_url = Unicode('')
183
183
184 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
184 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
185 config=True, help="""The class to use for the DB backend""")
185 config=True, help="""The class to use for the DB backend""")
186
186
187 # not configurable
187 # not configurable
188 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
188 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
189 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
189 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
190
190
191 def _ip_changed(self, name, old, new):
191 def _ip_changed(self, name, old, new):
192 self.engine_ip = new
192 self.engine_ip = new
193 self.client_ip = new
193 self.client_ip = new
194 self.monitor_ip = new
194 self.monitor_ip = new
195 self._update_monitor_url()
195 self._update_monitor_url()
196
196
197 def _update_monitor_url(self):
197 def _update_monitor_url(self):
198 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
198 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
199
199
200 def _transport_changed(self, name, old, new):
200 def _transport_changed(self, name, old, new):
201 self.engine_transport = new
201 self.engine_transport = new
202 self.client_transport = new
202 self.client_transport = new
203 self.monitor_transport = new
203 self.monitor_transport = new
204 self._update_monitor_url()
204 self._update_monitor_url()
205
205
206 def __init__(self, **kwargs):
206 def __init__(self, **kwargs):
207 super(HubFactory, self).__init__(**kwargs)
207 super(HubFactory, self).__init__(**kwargs)
208 self._update_monitor_url()
208 self._update_monitor_url()
209
209
210
210
211 def construct(self):
211 def construct(self):
212 self.init_hub()
212 self.init_hub()
213
213
214 def start(self):
214 def start(self):
215 self.heartmonitor.start()
215 self.heartmonitor.start()
216 self.log.info("Heartmonitor started")
216 self.log.info("Heartmonitor started")
217
217
218 def init_hub(self):
218 def init_hub(self):
219 """construct"""
219 """construct"""
220 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
220 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
221 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
221 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
222
222
223 ctx = self.context
223 ctx = self.context
224 loop = self.loop
224 loop = self.loop
225
225
226 # Registrar socket
226 # Registrar socket
227 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
227 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
228 q.bind(client_iface % self.regport)
228 q.bind(client_iface % self.regport)
229 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
229 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
230 if self.client_ip != self.engine_ip:
230 if self.client_ip != self.engine_ip:
231 q.bind(engine_iface % self.regport)
231 q.bind(engine_iface % self.regport)
232 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
232 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
233
233
234 ### Engine connections ###
234 ### Engine connections ###
235
235
236 # heartbeat
236 # heartbeat
237 hpub = ctx.socket(zmq.PUB)
237 hpub = ctx.socket(zmq.PUB)
238 hpub.bind(engine_iface % self.hb[0])
238 hpub.bind(engine_iface % self.hb[0])
239 hrep = ctx.socket(zmq.ROUTER)
239 hrep = ctx.socket(zmq.ROUTER)
240 hrep.bind(engine_iface % self.hb[1])
240 hrep.bind(engine_iface % self.hb[1])
241 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
241 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
242 pingstream=ZMQStream(hpub,loop),
242 pingstream=ZMQStream(hpub,loop),
243 pongstream=ZMQStream(hrep,loop)
243 pongstream=ZMQStream(hrep,loop)
244 )
244 )
245
245
246 ### Client connections ###
246 ### Client connections ###
247 # Notifier socket
247 # Notifier socket
248 n = ZMQStream(ctx.socket(zmq.PUB), loop)
248 n = ZMQStream(ctx.socket(zmq.PUB), loop)
249 n.bind(client_iface%self.notifier_port)
249 n.bind(client_iface%self.notifier_port)
250
250
251 ### build and launch the queues ###
251 ### build and launch the queues ###
252
252
253 # monitor socket
253 # monitor socket
254 sub = ctx.socket(zmq.SUB)
254 sub = ctx.socket(zmq.SUB)
255 sub.setsockopt(zmq.SUBSCRIBE, b"")
255 sub.setsockopt(zmq.SUBSCRIBE, b"")
256 sub.bind(self.monitor_url)
256 sub.bind(self.monitor_url)
257 sub.bind('inproc://monitor')
257 sub.bind('inproc://monitor')
258 sub = ZMQStream(sub, loop)
258 sub = ZMQStream(sub, loop)
259
259
260 # connect the db
260 # connect the db
261 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
261 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
262 # cdir = self.config.Global.cluster_dir
262 # cdir = self.config.Global.cluster_dir
263 self.db = import_item(str(self.db_class))(session=self.session.session,
263 self.db = import_item(str(self.db_class))(session=self.session.session,
264 config=self.config, log=self.log)
264 config=self.config, log=self.log)
265 time.sleep(.25)
265 time.sleep(.25)
266 try:
266 try:
267 scheme = self.config.TaskScheduler.scheme_name
267 scheme = self.config.TaskScheduler.scheme_name
268 except AttributeError:
268 except AttributeError:
269 from .scheduler import TaskScheduler
269 from .scheduler import TaskScheduler
270 scheme = TaskScheduler.scheme_name.get_default_value()
270 scheme = TaskScheduler.scheme_name.get_default_value()
271 # build connection dicts
271 # build connection dicts
272 self.engine_info = {
272 self.engine_info = {
273 'control' : engine_iface%self.control[1],
273 'control' : engine_iface%self.control[1],
274 'mux': engine_iface%self.mux[1],
274 'mux': engine_iface%self.mux[1],
275 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
275 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
276 'task' : engine_iface%self.task[1],
276 'task' : engine_iface%self.task[1],
277 'iopub' : engine_iface%self.iopub[1],
277 'iopub' : engine_iface%self.iopub[1],
278 # 'monitor' : engine_iface%self.mon_port,
278 # 'monitor' : engine_iface%self.mon_port,
279 }
279 }
280
280
281 self.client_info = {
281 self.client_info = {
282 'control' : client_iface%self.control[0],
282 'control' : client_iface%self.control[0],
283 'mux': client_iface%self.mux[0],
283 'mux': client_iface%self.mux[0],
284 'task' : (scheme, client_iface%self.task[0]),
284 'task' : (scheme, client_iface%self.task[0]),
285 'iopub' : client_iface%self.iopub[0],
285 'iopub' : client_iface%self.iopub[0],
286 'notification': client_iface%self.notifier_port
286 'notification': client_iface%self.notifier_port
287 }
287 }
288 self.log.debug("Hub engine addrs: %s", self.engine_info)
288 self.log.debug("Hub engine addrs: %s", self.engine_info)
289 self.log.debug("Hub client addrs: %s", self.client_info)
289 self.log.debug("Hub client addrs: %s", self.client_info)
290
290
291 # resubmit stream
291 # resubmit stream
292 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
292 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
293 url = util.disambiguate_url(self.client_info['task'][-1])
293 url = util.disambiguate_url(self.client_info['task'][-1])
294 r.setsockopt(zmq.IDENTITY, self.session.bsession)
294 r.setsockopt(zmq.IDENTITY, self.session.bsession)
295 r.connect(url)
295 r.connect(url)
296
296
297 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
297 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
298 query=q, notifier=n, resubmit=r, db=self.db,
298 query=q, notifier=n, resubmit=r, db=self.db,
299 engine_info=self.engine_info, client_info=self.client_info,
299 engine_info=self.engine_info, client_info=self.client_info,
300 log=self.log)
300 log=self.log)
301
301
302
302
303 class Hub(SessionFactory):
303 class Hub(SessionFactory):
304 """The IPython Controller Hub with 0MQ connections
304 """The IPython Controller Hub with 0MQ connections
305
305
306 Parameters
306 Parameters
307 ==========
307 ==========
308 loop: zmq IOLoop instance
308 loop: zmq IOLoop instance
309 session: Session object
309 session: Session object
310 <removed> context: zmq context for creating new connections (?)
310 <removed> context: zmq context for creating new connections (?)
311 queue: ZMQStream for monitoring the command queue (SUB)
311 queue: ZMQStream for monitoring the command queue (SUB)
312 query: ZMQStream for engine registration and client queries requests (XREP)
312 query: ZMQStream for engine registration and client queries requests (XREP)
313 heartbeat: HeartMonitor object checking the pulse of the engines
313 heartbeat: HeartMonitor object checking the pulse of the engines
314 notifier: ZMQStream for broadcasting engine registration changes (PUB)
314 notifier: ZMQStream for broadcasting engine registration changes (PUB)
315 db: connection to db for out of memory logging of commands
315 db: connection to db for out of memory logging of commands
316 NotImplemented
316 NotImplemented
317 engine_info: dict of zmq connection information for engines to connect
317 engine_info: dict of zmq connection information for engines to connect
318 to the queues.
318 to the queues.
319 client_info: dict of zmq connection information for engines to connect
319 client_info: dict of zmq connection information for engines to connect
320 to the queues.
320 to the queues.
321 """
321 """
322 # internal data structures:
322 # internal data structures:
323 ids=Set() # engine IDs
323 ids=Set() # engine IDs
324 keytable=Dict()
324 keytable=Dict()
325 by_ident=Dict()
325 by_ident=Dict()
326 engines=Dict()
326 engines=Dict()
327 clients=Dict()
327 clients=Dict()
328 hearts=Dict()
328 hearts=Dict()
329 pending=Set()
329 pending=Set()
330 queues=Dict() # pending msg_ids keyed by engine_id
330 queues=Dict() # pending msg_ids keyed by engine_id
331 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
331 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
332 completed=Dict() # completed msg_ids keyed by engine_id
332 completed=Dict() # completed msg_ids keyed by engine_id
333 all_completed=Set() # completed msg_ids keyed by engine_id
333 all_completed=Set() # completed msg_ids keyed by engine_id
334 dead_engines=Set() # completed msg_ids keyed by engine_id
334 dead_engines=Set() # completed msg_ids keyed by engine_id
335 unassigned=Set() # set of task msg_ds not yet assigned a destination
335 unassigned=Set() # set of task msg_ds not yet assigned a destination
336 incoming_registrations=Dict()
336 incoming_registrations=Dict()
337 registration_timeout=Integer()
337 registration_timeout=Integer()
338 _idcounter=Integer(0)
338 _idcounter=Integer(0)
339
339
340 # objects from constructor:
340 # objects from constructor:
341 query=Instance(ZMQStream)
341 query=Instance(ZMQStream)
342 monitor=Instance(ZMQStream)
342 monitor=Instance(ZMQStream)
343 notifier=Instance(ZMQStream)
343 notifier=Instance(ZMQStream)
344 resubmit=Instance(ZMQStream)
344 resubmit=Instance(ZMQStream)
345 heartmonitor=Instance(HeartMonitor)
345 heartmonitor=Instance(HeartMonitor)
346 db=Instance(object)
346 db=Instance(object)
347 client_info=Dict()
347 client_info=Dict()
348 engine_info=Dict()
348 engine_info=Dict()
349
349
350
350
351 def __init__(self, **kwargs):
351 def __init__(self, **kwargs):
352 """
352 """
353 # universal:
353 # universal:
354 loop: IOLoop for creating future connections
354 loop: IOLoop for creating future connections
355 session: streamsession for sending serialized data
355 session: streamsession for sending serialized data
356 # engine:
356 # engine:
357 queue: ZMQStream for monitoring queue messages
357 queue: ZMQStream for monitoring queue messages
358 query: ZMQStream for engine+client registration and client requests
358 query: ZMQStream for engine+client registration and client requests
359 heartbeat: HeartMonitor object for tracking engines
359 heartbeat: HeartMonitor object for tracking engines
360 # extra:
360 # extra:
361 db: ZMQStream for db connection (NotImplemented)
361 db: ZMQStream for db connection (NotImplemented)
362 engine_info: zmq address/protocol dict for engine connections
362 engine_info: zmq address/protocol dict for engine connections
363 client_info: zmq address/protocol dict for client connections
363 client_info: zmq address/protocol dict for client connections
364 """
364 """
365
365
366 super(Hub, self).__init__(**kwargs)
366 super(Hub, self).__init__(**kwargs)
367 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
367 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
368
368
369 # validate connection dicts:
369 # validate connection dicts:
370 for k,v in self.client_info.iteritems():
370 for k,v in self.client_info.iteritems():
371 if k == 'task':
371 if k == 'task':
372 util.validate_url_container(v[1])
372 util.validate_url_container(v[1])
373 else:
373 else:
374 util.validate_url_container(v)
374 util.validate_url_container(v)
375 # util.validate_url_container(self.client_info)
375 # util.validate_url_container(self.client_info)
376 util.validate_url_container(self.engine_info)
376 util.validate_url_container(self.engine_info)
377
377
378 # register our callbacks
378 # register our callbacks
379 self.query.on_recv(self.dispatch_query)
379 self.query.on_recv(self.dispatch_query)
380 self.monitor.on_recv(self.dispatch_monitor_traffic)
380 self.monitor.on_recv(self.dispatch_monitor_traffic)
381
381
382 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
382 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
383 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
383 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
384
384
385 self.monitor_handlers = {b'in' : self.save_queue_request,
385 self.monitor_handlers = {b'in' : self.save_queue_request,
386 b'out': self.save_queue_result,
386 b'out': self.save_queue_result,
387 b'intask': self.save_task_request,
387 b'intask': self.save_task_request,
388 b'outtask': self.save_task_result,
388 b'outtask': self.save_task_result,
389 b'tracktask': self.save_task_destination,
389 b'tracktask': self.save_task_destination,
390 b'incontrol': _passer,
390 b'incontrol': _passer,
391 b'outcontrol': _passer,
391 b'outcontrol': _passer,
392 b'iopub': self.save_iopub_message,
392 b'iopub': self.save_iopub_message,
393 }
393 }
394
394
395 self.query_handlers = {'queue_request': self.queue_status,
395 self.query_handlers = {'queue_request': self.queue_status,
396 'result_request': self.get_results,
396 'result_request': self.get_results,
397 'history_request': self.get_history,
397 'history_request': self.get_history,
398 'db_request': self.db_query,
398 'db_request': self.db_query,
399 'purge_request': self.purge_results,
399 'purge_request': self.purge_results,
400 'load_request': self.check_load,
400 'load_request': self.check_load,
401 'resubmit_request': self.resubmit_task,
401 'resubmit_request': self.resubmit_task,
402 'shutdown_request': self.shutdown_request,
402 'shutdown_request': self.shutdown_request,
403 'registration_request' : self.register_engine,
403 'registration_request' : self.register_engine,
404 'unregistration_request' : self.unregister_engine,
404 'unregistration_request' : self.unregister_engine,
405 'connection_request': self.connection_request,
405 'connection_request': self.connection_request,
406 }
406 }
407
407
408 # ignore resubmit replies
408 # ignore resubmit replies
409 self.resubmit.on_recv(lambda msg: None, copy=False)
409 self.resubmit.on_recv(lambda msg: None, copy=False)
410
410
411 self.log.info("hub::created hub")
411 self.log.info("hub::created hub")
412
412
413 @property
413 @property
414 def _next_id(self):
414 def _next_id(self):
415 """gemerate a new ID.
415 """gemerate a new ID.
416
416
417 No longer reuse old ids, just count from 0."""
417 No longer reuse old ids, just count from 0."""
418 newid = self._idcounter
418 newid = self._idcounter
419 self._idcounter += 1
419 self._idcounter += 1
420 return newid
420 return newid
421 # newid = 0
421 # newid = 0
422 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
422 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
423 # # print newid, self.ids, self.incoming_registrations
423 # # print newid, self.ids, self.incoming_registrations
424 # while newid in self.ids or newid in incoming:
424 # while newid in self.ids or newid in incoming:
425 # newid += 1
425 # newid += 1
426 # return newid
426 # return newid
427
427
428 #-----------------------------------------------------------------------------
428 #-----------------------------------------------------------------------------
429 # message validation
429 # message validation
430 #-----------------------------------------------------------------------------
430 #-----------------------------------------------------------------------------
431
431
432 def _validate_targets(self, targets):
432 def _validate_targets(self, targets):
433 """turn any valid targets argument into a list of integer ids"""
433 """turn any valid targets argument into a list of integer ids"""
434 if targets is None:
434 if targets is None:
435 # default to all
435 # default to all
436 return self.ids
436 return self.ids
437
437
438 if isinstance(targets, (int,str,unicode)):
438 if isinstance(targets, (int,str,unicode)):
439 # only one target specified
439 # only one target specified
440 targets = [targets]
440 targets = [targets]
441 _targets = []
441 _targets = []
442 for t in targets:
442 for t in targets:
443 # map raw identities to ids
443 # map raw identities to ids
444 if isinstance(t, (str,unicode)):
444 if isinstance(t, (str,unicode)):
445 t = self.by_ident.get(cast_bytes(t), t)
445 t = self.by_ident.get(cast_bytes(t), t)
446 _targets.append(t)
446 _targets.append(t)
447 targets = _targets
447 targets = _targets
448 bad_targets = [ t for t in targets if t not in self.ids ]
448 bad_targets = [ t for t in targets if t not in self.ids ]
449 if bad_targets:
449 if bad_targets:
450 raise IndexError("No Such Engine: %r" % bad_targets)
450 raise IndexError("No Such Engine: %r" % bad_targets)
451 if not targets:
451 if not targets:
452 raise IndexError("No Engines Registered")
452 raise IndexError("No Engines Registered")
453 return targets
453 return targets
454
454
455 #-----------------------------------------------------------------------------
455 #-----------------------------------------------------------------------------
456 # dispatch methods (1 per stream)
456 # dispatch methods (1 per stream)
457 #-----------------------------------------------------------------------------
457 #-----------------------------------------------------------------------------
458
458
459
459
460 @util.log_errors
460 @util.log_errors
461 def dispatch_monitor_traffic(self, msg):
461 def dispatch_monitor_traffic(self, msg):
462 """all ME and Task queue messages come through here, as well as
462 """all ME and Task queue messages come through here, as well as
463 IOPub traffic."""
463 IOPub traffic."""
464 self.log.debug("monitor traffic: %r", msg[0])
464 self.log.debug("monitor traffic: %r", msg[0])
465 switch = msg[0]
465 switch = msg[0]
466 try:
466 try:
467 idents, msg = self.session.feed_identities(msg[1:])
467 idents, msg = self.session.feed_identities(msg[1:])
468 except ValueError:
468 except ValueError:
469 idents=[]
469 idents=[]
470 if not idents:
470 if not idents:
471 self.log.error("Monitor message without topic: %r", msg)
471 self.log.error("Monitor message without topic: %r", msg)
472 return
472 return
473 handler = self.monitor_handlers.get(switch, None)
473 handler = self.monitor_handlers.get(switch, None)
474 if handler is not None:
474 if handler is not None:
475 handler(idents, msg)
475 handler(idents, msg)
476 else:
476 else:
477 self.log.error("Unrecognized monitor topic: %r", switch)
477 self.log.error("Unrecognized monitor topic: %r", switch)
478
478
479
479
480 @util.log_errors
480 @util.log_errors
481 def dispatch_query(self, msg):
481 def dispatch_query(self, msg):
482 """Route registration requests and queries from clients."""
482 """Route registration requests and queries from clients."""
483 try:
483 try:
484 idents, msg = self.session.feed_identities(msg)
484 idents, msg = self.session.feed_identities(msg)
485 except ValueError:
485 except ValueError:
486 idents = []
486 idents = []
487 if not idents:
487 if not idents:
488 self.log.error("Bad Query Message: %r", msg)
488 self.log.error("Bad Query Message: %r", msg)
489 return
489 return
490 client_id = idents[0]
490 client_id = idents[0]
491 try:
491 try:
492 msg = self.session.unserialize(msg, content=True)
492 msg = self.session.unserialize(msg, content=True)
493 except Exception:
493 except Exception:
494 content = error.wrap_exception()
494 content = error.wrap_exception()
495 self.log.error("Bad Query Message: %r", msg, exc_info=True)
495 self.log.error("Bad Query Message: %r", msg, exc_info=True)
496 self.session.send(self.query, "hub_error", ident=client_id,
496 self.session.send(self.query, "hub_error", ident=client_id,
497 content=content)
497 content=content)
498 return
498 return
499 # print client_id, header, parent, content
499 # print client_id, header, parent, content
500 #switch on message type:
500 #switch on message type:
501 msg_type = msg['header']['msg_type']
501 msg_type = msg['header']['msg_type']
502 self.log.info("client::client %r requested %r", client_id, msg_type)
502 self.log.info("client::client %r requested %r", client_id, msg_type)
503 handler = self.query_handlers.get(msg_type, None)
503 handler = self.query_handlers.get(msg_type, None)
504 try:
504 try:
505 assert handler is not None, "Bad Message Type: %r" % msg_type
505 assert handler is not None, "Bad Message Type: %r" % msg_type
506 except:
506 except:
507 content = error.wrap_exception()
507 content = error.wrap_exception()
508 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
508 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
509 self.session.send(self.query, "hub_error", ident=client_id,
509 self.session.send(self.query, "hub_error", ident=client_id,
510 content=content)
510 content=content)
511 return
511 return
512
512
513 else:
513 else:
514 handler(idents, msg)
514 handler(idents, msg)
515
515
516 def dispatch_db(self, msg):
516 def dispatch_db(self, msg):
517 """"""
517 """"""
518 raise NotImplementedError
518 raise NotImplementedError
519
519
520 #---------------------------------------------------------------------------
520 #---------------------------------------------------------------------------
521 # handler methods (1 per event)
521 # handler methods (1 per event)
522 #---------------------------------------------------------------------------
522 #---------------------------------------------------------------------------
523
523
524 #----------------------- Heartbeat --------------------------------------
524 #----------------------- Heartbeat --------------------------------------
525
525
526 def handle_new_heart(self, heart):
526 def handle_new_heart(self, heart):
527 """handler to attach to heartbeater.
527 """handler to attach to heartbeater.
528 Called when a new heart starts to beat.
528 Called when a new heart starts to beat.
529 Triggers completion of registration."""
529 Triggers completion of registration."""
530 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
530 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
531 if heart not in self.incoming_registrations:
531 if heart not in self.incoming_registrations:
532 self.log.info("heartbeat::ignoring new heart: %r", heart)
532 self.log.info("heartbeat::ignoring new heart: %r", heart)
533 else:
533 else:
534 self.finish_registration(heart)
534 self.finish_registration(heart)
535
535
536
536
537 def handle_heart_failure(self, heart):
537 def handle_heart_failure(self, heart):
538 """handler to attach to heartbeater.
538 """handler to attach to heartbeater.
539 called when a previously registered heart fails to respond to beat request.
539 called when a previously registered heart fails to respond to beat request.
540 triggers unregistration"""
540 triggers unregistration"""
541 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
541 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
542 eid = self.hearts.get(heart, None)
542 eid = self.hearts.get(heart, None)
543 queue = self.engines[eid].queue
543 queue = self.engines[eid].queue
544 if eid is None or self.keytable[eid] in self.dead_engines:
544 if eid is None or self.keytable[eid] in self.dead_engines:
545 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
545 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
546 else:
546 else:
547 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
547 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
548
548
549 #----------------------- MUX Queue Traffic ------------------------------
549 #----------------------- MUX Queue Traffic ------------------------------
550
550
551 def save_queue_request(self, idents, msg):
551 def save_queue_request(self, idents, msg):
552 if len(idents) < 2:
552 if len(idents) < 2:
553 self.log.error("invalid identity prefix: %r", idents)
553 self.log.error("invalid identity prefix: %r", idents)
554 return
554 return
555 queue_id, client_id = idents[:2]
555 queue_id, client_id = idents[:2]
556 try:
556 try:
557 msg = self.session.unserialize(msg)
557 msg = self.session.unserialize(msg)
558 except Exception:
558 except Exception:
559 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
559 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
560 return
560 return
561
561
562 eid = self.by_ident.get(queue_id, None)
562 eid = self.by_ident.get(queue_id, None)
563 if eid is None:
563 if eid is None:
564 self.log.error("queue::target %r not registered", queue_id)
564 self.log.error("queue::target %r not registered", queue_id)
565 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
565 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
566 return
566 return
567 record = init_record(msg)
567 record = init_record(msg)
568 msg_id = record['msg_id']
568 msg_id = record['msg_id']
569 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
569 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
570 # Unicode in records
570 # Unicode in records
571 record['engine_uuid'] = queue_id.decode('ascii')
571 record['engine_uuid'] = queue_id.decode('ascii')
572 record['client_uuid'] = client_id.decode('ascii')
572 record['client_uuid'] = client_id.decode('ascii')
573 record['queue'] = 'mux'
573 record['queue'] = 'mux'
574
574
575 try:
575 try:
576 # it's posible iopub arrived first:
576 # it's posible iopub arrived first:
577 existing = self.db.get_record(msg_id)
577 existing = self.db.get_record(msg_id)
578 for key,evalue in existing.iteritems():
578 for key,evalue in existing.iteritems():
579 rvalue = record.get(key, None)
579 rvalue = record.get(key, None)
580 if evalue and rvalue and evalue != rvalue:
580 if evalue and rvalue and evalue != rvalue:
581 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
581 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
582 elif evalue and not rvalue:
582 elif evalue and not rvalue:
583 record[key] = evalue
583 record[key] = evalue
584 try:
584 try:
585 self.db.update_record(msg_id, record)
585 self.db.update_record(msg_id, record)
586 except Exception:
586 except Exception:
587 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
587 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
588 except KeyError:
588 except KeyError:
589 try:
589 try:
590 self.db.add_record(msg_id, record)
590 self.db.add_record(msg_id, record)
591 except Exception:
591 except Exception:
592 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
592 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
593
593
594
594
595 self.pending.add(msg_id)
595 self.pending.add(msg_id)
596 self.queues[eid].append(msg_id)
596 self.queues[eid].append(msg_id)
597
597
598 def save_queue_result(self, idents, msg):
598 def save_queue_result(self, idents, msg):
599 if len(idents) < 2:
599 if len(idents) < 2:
600 self.log.error("invalid identity prefix: %r", idents)
600 self.log.error("invalid identity prefix: %r", idents)
601 return
601 return
602
602
603 client_id, queue_id = idents[:2]
603 client_id, queue_id = idents[:2]
604 try:
604 try:
605 msg = self.session.unserialize(msg)
605 msg = self.session.unserialize(msg)
606 except Exception:
606 except Exception:
607 self.log.error("queue::engine %r sent invalid message to %r: %r",
607 self.log.error("queue::engine %r sent invalid message to %r: %r",
608 queue_id, client_id, msg, exc_info=True)
608 queue_id, client_id, msg, exc_info=True)
609 return
609 return
610
610
611 eid = self.by_ident.get(queue_id, None)
611 eid = self.by_ident.get(queue_id, None)
612 if eid is None:
612 if eid is None:
613 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
613 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
614 return
614 return
615
615
616 parent = msg['parent_header']
616 parent = msg['parent_header']
617 if not parent:
617 if not parent:
618 return
618 return
619 msg_id = parent['msg_id']
619 msg_id = parent['msg_id']
620 if msg_id in self.pending:
620 if msg_id in self.pending:
621 self.pending.remove(msg_id)
621 self.pending.remove(msg_id)
622 self.all_completed.add(msg_id)
622 self.all_completed.add(msg_id)
623 self.queues[eid].remove(msg_id)
623 self.queues[eid].remove(msg_id)
624 self.completed[eid].append(msg_id)
624 self.completed[eid].append(msg_id)
625 self.log.info("queue::request %r completed on %s", msg_id, eid)
625 self.log.info("queue::request %r completed on %s", msg_id, eid)
626 elif msg_id not in self.all_completed:
626 elif msg_id not in self.all_completed:
627 # it could be a result from a dead engine that died before delivering the
627 # it could be a result from a dead engine that died before delivering the
628 # result
628 # result
629 self.log.warn("queue:: unknown msg finished %r", msg_id)
629 self.log.warn("queue:: unknown msg finished %r", msg_id)
630 return
630 return
631 # update record anyway, because the unregistration could have been premature
631 # update record anyway, because the unregistration could have been premature
632 rheader = msg['header']
632 rheader = msg['header']
633 completed = rheader['date']
633 completed = rheader['date']
634 started = rheader.get('started', None)
634 started = rheader.get('started', None)
635 result = {
635 result = {
636 'result_header' : rheader,
636 'result_header' : rheader,
637 'result_content': msg['content'],
637 'result_content': msg['content'],
638 'received': datetime.now(),
638 'received': datetime.now(),
639 'started' : started,
639 'started' : started,
640 'completed' : completed
640 'completed' : completed
641 }
641 }
642
642
643 result['result_buffers'] = msg['buffers']
643 result['result_buffers'] = msg['buffers']
644 try:
644 try:
645 self.db.update_record(msg_id, result)
645 self.db.update_record(msg_id, result)
646 except Exception:
646 except Exception:
647 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
647 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
648
648
649
649
650 #--------------------- Task Queue Traffic ------------------------------
650 #--------------------- Task Queue Traffic ------------------------------
651
651
652 def save_task_request(self, idents, msg):
652 def save_task_request(self, idents, msg):
653 """Save the submission of a task."""
653 """Save the submission of a task."""
654 client_id = idents[0]
654 client_id = idents[0]
655
655
656 try:
656 try:
657 msg = self.session.unserialize(msg)
657 msg = self.session.unserialize(msg)
658 except Exception:
658 except Exception:
659 self.log.error("task::client %r sent invalid task message: %r",
659 self.log.error("task::client %r sent invalid task message: %r",
660 client_id, msg, exc_info=True)
660 client_id, msg, exc_info=True)
661 return
661 return
662 record = init_record(msg)
662 record = init_record(msg)
663
663
664 record['client_uuid'] = client_id.decode('ascii')
664 record['client_uuid'] = client_id.decode('ascii')
665 record['queue'] = 'task'
665 record['queue'] = 'task'
666 header = msg['header']
666 header = msg['header']
667 msg_id = header['msg_id']
667 msg_id = header['msg_id']
668 self.pending.add(msg_id)
668 self.pending.add(msg_id)
669 self.unassigned.add(msg_id)
669 self.unassigned.add(msg_id)
670 try:
670 try:
671 # it's posible iopub arrived first:
671 # it's posible iopub arrived first:
672 existing = self.db.get_record(msg_id)
672 existing = self.db.get_record(msg_id)
673 if existing['resubmitted']:
673 if existing['resubmitted']:
674 for key in ('submitted', 'client_uuid', 'buffers'):
674 for key in ('submitted', 'client_uuid', 'buffers'):
675 # don't clobber these keys on resubmit
675 # don't clobber these keys on resubmit
676 # submitted and client_uuid should be different
676 # submitted and client_uuid should be different
677 # and buffers might be big, and shouldn't have changed
677 # and buffers might be big, and shouldn't have changed
678 record.pop(key)
678 record.pop(key)
679 # still check content,header which should not change
679 # still check content,header which should not change
680 # but are not expensive to compare as buffers
680 # but are not expensive to compare as buffers
681
681
682 for key,evalue in existing.iteritems():
682 for key,evalue in existing.iteritems():
683 if key.endswith('buffers'):
683 if key.endswith('buffers'):
684 # don't compare buffers
684 # don't compare buffers
685 continue
685 continue
686 rvalue = record.get(key, None)
686 rvalue = record.get(key, None)
687 if evalue and rvalue and evalue != rvalue:
687 if evalue and rvalue and evalue != rvalue:
688 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
688 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
689 elif evalue and not rvalue:
689 elif evalue and not rvalue:
690 record[key] = evalue
690 record[key] = evalue
691 try:
691 try:
692 self.db.update_record(msg_id, record)
692 self.db.update_record(msg_id, record)
693 except Exception:
693 except Exception:
694 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
694 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
695 except KeyError:
695 except KeyError:
696 try:
696 try:
697 self.db.add_record(msg_id, record)
697 self.db.add_record(msg_id, record)
698 except Exception:
698 except Exception:
699 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
699 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
700 except Exception:
700 except Exception:
701 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
701 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
702
702
703 def save_task_result(self, idents, msg):
703 def save_task_result(self, idents, msg):
704 """save the result of a completed task."""
704 """save the result of a completed task."""
705 client_id = idents[0]
705 client_id = idents[0]
706 try:
706 try:
707 msg = self.session.unserialize(msg)
707 msg = self.session.unserialize(msg)
708 except Exception:
708 except Exception:
709 self.log.error("task::invalid task result message send to %r: %r",
709 self.log.error("task::invalid task result message send to %r: %r",
710 client_id, msg, exc_info=True)
710 client_id, msg, exc_info=True)
711 return
711 return
712
712
713 parent = msg['parent_header']
713 parent = msg['parent_header']
714 if not parent:
714 if not parent:
715 # print msg
715 # print msg
716 self.log.warn("Task %r had no parent!", msg)
716 self.log.warn("Task %r had no parent!", msg)
717 return
717 return
718 msg_id = parent['msg_id']
718 msg_id = parent['msg_id']
719 if msg_id in self.unassigned:
719 if msg_id in self.unassigned:
720 self.unassigned.remove(msg_id)
720 self.unassigned.remove(msg_id)
721
721
722 header = msg['header']
722 header = msg['header']
723 engine_uuid = header.get('engine', u'')
723 engine_uuid = header.get('engine', u'')
724 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
724 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
725
725
726 status = header.get('status', None)
726 status = header.get('status', None)
727
727
728 if msg_id in self.pending:
728 if msg_id in self.pending:
729 self.log.info("task::task %r finished on %s", msg_id, eid)
729 self.log.info("task::task %r finished on %s", msg_id, eid)
730 self.pending.remove(msg_id)
730 self.pending.remove(msg_id)
731 self.all_completed.add(msg_id)
731 self.all_completed.add(msg_id)
732 if eid is not None:
732 if eid is not None:
733 if status != 'aborted':
733 if status != 'aborted':
734 self.completed[eid].append(msg_id)
734 self.completed[eid].append(msg_id)
735 if msg_id in self.tasks[eid]:
735 if msg_id in self.tasks[eid]:
736 self.tasks[eid].remove(msg_id)
736 self.tasks[eid].remove(msg_id)
737 completed = header['date']
737 completed = header['date']
738 started = header.get('started', None)
738 started = header.get('started', None)
739 result = {
739 result = {
740 'result_header' : header,
740 'result_header' : header,
741 'result_content': msg['content'],
741 'result_content': msg['content'],
742 'started' : started,
742 'started' : started,
743 'completed' : completed,
743 'completed' : completed,
744 'received' : datetime.now(),
744 'received' : datetime.now(),
745 'engine_uuid': engine_uuid,
745 'engine_uuid': engine_uuid,
746 }
746 }
747
747
748 result['result_buffers'] = msg['buffers']
748 result['result_buffers'] = msg['buffers']
749 try:
749 try:
750 self.db.update_record(msg_id, result)
750 self.db.update_record(msg_id, result)
751 except Exception:
751 except Exception:
752 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
752 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
753
753
754 else:
754 else:
755 self.log.debug("task::unknown task %r finished", msg_id)
755 self.log.debug("task::unknown task %r finished", msg_id)
756
756
757 def save_task_destination(self, idents, msg):
757 def save_task_destination(self, idents, msg):
758 try:
758 try:
759 msg = self.session.unserialize(msg, content=True)
759 msg = self.session.unserialize(msg, content=True)
760 except Exception:
760 except Exception:
761 self.log.error("task::invalid task tracking message", exc_info=True)
761 self.log.error("task::invalid task tracking message", exc_info=True)
762 return
762 return
763 content = msg['content']
763 content = msg['content']
764 # print (content)
764 # print (content)
765 msg_id = content['msg_id']
765 msg_id = content['msg_id']
766 engine_uuid = content['engine_id']
766 engine_uuid = content['engine_id']
767 eid = self.by_ident[cast_bytes(engine_uuid)]
767 eid = self.by_ident[cast_bytes(engine_uuid)]
768
768
769 self.log.info("task::task %r arrived on %r", msg_id, eid)
769 self.log.info("task::task %r arrived on %r", msg_id, eid)
770 if msg_id in self.unassigned:
770 if msg_id in self.unassigned:
771 self.unassigned.remove(msg_id)
771 self.unassigned.remove(msg_id)
772 # else:
772 # else:
773 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
773 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
774
774
775 self.tasks[eid].append(msg_id)
775 self.tasks[eid].append(msg_id)
776 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
776 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
777 try:
777 try:
778 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
778 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
779 except Exception:
779 except Exception:
780 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
780 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
781
781
782
782
783 def mia_task_request(self, idents, msg):
783 def mia_task_request(self, idents, msg):
784 raise NotImplementedError
784 raise NotImplementedError
785 client_id = idents[0]
785 client_id = idents[0]
786 # content = dict(mia=self.mia,status='ok')
786 # content = dict(mia=self.mia,status='ok')
787 # self.session.send('mia_reply', content=content, idents=client_id)
787 # self.session.send('mia_reply', content=content, idents=client_id)
788
788
789
789
790 #--------------------- IOPub Traffic ------------------------------
790 #--------------------- IOPub Traffic ------------------------------
791
791
792 def save_iopub_message(self, topics, msg):
792 def save_iopub_message(self, topics, msg):
793 """save an iopub message into the db"""
793 """save an iopub message into the db"""
794 # print (topics)
794 # print (topics)
795 try:
795 try:
796 msg = self.session.unserialize(msg, content=True)
796 msg = self.session.unserialize(msg, content=True)
797 except Exception:
797 except Exception:
798 self.log.error("iopub::invalid IOPub message", exc_info=True)
798 self.log.error("iopub::invalid IOPub message", exc_info=True)
799 return
799 return
800
800
801 parent = msg['parent_header']
801 parent = msg['parent_header']
802 if not parent:
802 if not parent:
803 self.log.error("iopub::invalid IOPub message: %r", msg)
803 self.log.error("iopub::invalid IOPub message: %r", msg)
804 return
804 return
805 msg_id = parent['msg_id']
805 msg_id = parent['msg_id']
806 msg_type = msg['header']['msg_type']
806 msg_type = msg['header']['msg_type']
807 content = msg['content']
807 content = msg['content']
808
808
809 # ensure msg_id is in db
809 # ensure msg_id is in db
810 try:
810 try:
811 rec = self.db.get_record(msg_id)
811 rec = self.db.get_record(msg_id)
812 except KeyError:
812 except KeyError:
813 rec = empty_record()
813 rec = empty_record()
814 rec['msg_id'] = msg_id
814 rec['msg_id'] = msg_id
815 self.db.add_record(msg_id, rec)
815 self.db.add_record(msg_id, rec)
816 # stream
816 # stream
817 d = {}
817 d = {}
818 if msg_type == 'stream':
818 if msg_type == 'stream':
819 name = content['name']
819 name = content['name']
820 s = rec[name] or ''
820 s = rec[name] or ''
821 d[name] = s + content['data']
821 d[name] = s + content['data']
822
822
823 elif msg_type == 'pyerr':
823 elif msg_type == 'pyerr':
824 d['pyerr'] = content
824 d['pyerr'] = content
825 elif msg_type == 'pyin':
825 elif msg_type == 'pyin':
826 d['pyin'] = content['code']
826 d['pyin'] = content['code']
827 else:
827 else:
828 d[msg_type] = content.get('data', '')
828 d[msg_type] = content.get('data', '')
829
829
830 try:
830 try:
831 self.db.update_record(msg_id, d)
831 self.db.update_record(msg_id, d)
832 except Exception:
832 except Exception:
833 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
833 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
834
834
835
835
836
836
837 #-------------------------------------------------------------------------
837 #-------------------------------------------------------------------------
838 # Registration requests
838 # Registration requests
839 #-------------------------------------------------------------------------
839 #-------------------------------------------------------------------------
840
840
841 def connection_request(self, client_id, msg):
841 def connection_request(self, client_id, msg):
842 """Reply with connection addresses for clients."""
842 """Reply with connection addresses for clients."""
843 self.log.info("client::client %r connected", client_id)
843 self.log.info("client::client %r connected", client_id)
844 content = dict(status='ok')
844 content = dict(status='ok')
845 content.update(self.client_info)
845 content.update(self.client_info)
846 jsonable = {}
846 jsonable = {}
847 for k,v in self.keytable.iteritems():
847 for k,v in self.keytable.iteritems():
848 if v not in self.dead_engines:
848 if v not in self.dead_engines:
849 jsonable[str(k)] = v.decode('ascii')
849 jsonable[str(k)] = v.decode('ascii')
850 content['engines'] = jsonable
850 content['engines'] = jsonable
851 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
851 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
852
852
853 def register_engine(self, reg, msg):
853 def register_engine(self, reg, msg):
854 """Register a new engine."""
854 """Register a new engine."""
855 content = msg['content']
855 content = msg['content']
856 try:
856 try:
857 queue = cast_bytes(content['queue'])
857 queue = cast_bytes(content['queue'])
858 except KeyError:
858 except KeyError:
859 self.log.error("registration::queue not specified", exc_info=True)
859 self.log.error("registration::queue not specified", exc_info=True)
860 return
860 return
861 heart = content.get('heartbeat', None)
861 heart = content.get('heartbeat', None)
862 if heart:
862 if heart:
863 heart = cast_bytes(heart)
863 heart = cast_bytes(heart)
864 """register a new engine, and create the socket(s) necessary"""
864 """register a new engine, and create the socket(s) necessary"""
865 eid = self._next_id
865 eid = self._next_id
866 # print (eid, queue, reg, heart)
866 # print (eid, queue, reg, heart)
867
867
868 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
868 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
869
869
870 content = dict(id=eid,status='ok')
870 content = dict(id=eid,status='ok')
871 content.update(self.engine_info)
871 content.update(self.engine_info)
872 # check if requesting available IDs:
872 # check if requesting available IDs:
873 if queue in self.by_ident:
873 if queue in self.by_ident:
874 try:
874 try:
875 raise KeyError("queue_id %r in use" % queue)
875 raise KeyError("queue_id %r in use" % queue)
876 except:
876 except:
877 content = error.wrap_exception()
877 content = error.wrap_exception()
878 self.log.error("queue_id %r in use", queue, exc_info=True)
878 self.log.error("queue_id %r in use", queue, exc_info=True)
879 elif heart in self.hearts: # need to check unique hearts?
879 elif heart in self.hearts: # need to check unique hearts?
880 try:
880 try:
881 raise KeyError("heart_id %r in use" % heart)
881 raise KeyError("heart_id %r in use" % heart)
882 except:
882 except:
883 self.log.error("heart_id %r in use", heart, exc_info=True)
883 self.log.error("heart_id %r in use", heart, exc_info=True)
884 content = error.wrap_exception()
884 content = error.wrap_exception()
885 else:
885 else:
886 for h, pack in self.incoming_registrations.iteritems():
886 for h, pack in self.incoming_registrations.iteritems():
887 if heart == h:
887 if heart == h:
888 try:
888 try:
889 raise KeyError("heart_id %r in use" % heart)
889 raise KeyError("heart_id %r in use" % heart)
890 except:
890 except:
891 self.log.error("heart_id %r in use", heart, exc_info=True)
891 self.log.error("heart_id %r in use", heart, exc_info=True)
892 content = error.wrap_exception()
892 content = error.wrap_exception()
893 break
893 break
894 elif queue == pack[1]:
894 elif queue == pack[1]:
895 try:
895 try:
896 raise KeyError("queue_id %r in use" % queue)
896 raise KeyError("queue_id %r in use" % queue)
897 except:
897 except:
898 self.log.error("queue_id %r in use", queue, exc_info=True)
898 self.log.error("queue_id %r in use", queue, exc_info=True)
899 content = error.wrap_exception()
899 content = error.wrap_exception()
900 break
900 break
901
901
902 msg = self.session.send(self.query, "registration_reply",
902 msg = self.session.send(self.query, "registration_reply",
903 content=content,
903 content=content,
904 ident=reg)
904 ident=reg)
905
905
906 if content['status'] == 'ok':
906 if content['status'] == 'ok':
907 if heart in self.heartmonitor.hearts:
907 if heart in self.heartmonitor.hearts:
908 # already beating
908 # already beating
909 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
909 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
910 self.finish_registration(heart)
910 self.finish_registration(heart)
911 else:
911 else:
912 purge = lambda : self._purge_stalled_registration(heart)
912 purge = lambda : self._purge_stalled_registration(heart)
913 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
913 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
914 dc.start()
914 dc.start()
915 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
915 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
916 else:
916 else:
917 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
917 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
918 return eid
918 return eid
919
919
920 def unregister_engine(self, ident, msg):
920 def unregister_engine(self, ident, msg):
921 """Unregister an engine that explicitly requested to leave."""
921 """Unregister an engine that explicitly requested to leave."""
922 try:
922 try:
923 eid = msg['content']['id']
923 eid = msg['content']['id']
924 except:
924 except:
925 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
925 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
926 return
926 return
927 self.log.info("registration::unregister_engine(%r)", eid)
927 self.log.info("registration::unregister_engine(%r)", eid)
928 # print (eid)
928 # print (eid)
929 uuid = self.keytable[eid]
929 uuid = self.keytable[eid]
930 content=dict(id=eid, queue=uuid.decode('ascii'))
930 content=dict(id=eid, queue=uuid.decode('ascii'))
931 self.dead_engines.add(uuid)
931 self.dead_engines.add(uuid)
932 # self.ids.remove(eid)
932 # self.ids.remove(eid)
933 # uuid = self.keytable.pop(eid)
933 # uuid = self.keytable.pop(eid)
934 #
934 #
935 # ec = self.engines.pop(eid)
935 # ec = self.engines.pop(eid)
936 # self.hearts.pop(ec.heartbeat)
936 # self.hearts.pop(ec.heartbeat)
937 # self.by_ident.pop(ec.queue)
937 # self.by_ident.pop(ec.queue)
938 # self.completed.pop(eid)
938 # self.completed.pop(eid)
939 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
939 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
940 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
940 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
941 dc.start()
941 dc.start()
942 ############## TODO: HANDLE IT ################
942 ############## TODO: HANDLE IT ################
943
943
944 if self.notifier:
944 if self.notifier:
945 self.session.send(self.notifier, "unregistration_notification", content=content)
945 self.session.send(self.notifier, "unregistration_notification", content=content)
946
946
947 def _handle_stranded_msgs(self, eid, uuid):
947 def _handle_stranded_msgs(self, eid, uuid):
948 """Handle messages known to be on an engine when the engine unregisters.
948 """Handle messages known to be on an engine when the engine unregisters.
949
949
950 It is possible that this will fire prematurely - that is, an engine will
950 It is possible that this will fire prematurely - that is, an engine will
951 go down after completing a result, and the client will be notified
951 go down after completing a result, and the client will be notified
952 that the result failed and later receive the actual result.
952 that the result failed and later receive the actual result.
953 """
953 """
954
954
955 outstanding = self.queues[eid]
955 outstanding = self.queues[eid]
956
956
957 for msg_id in outstanding:
957 for msg_id in outstanding:
958 self.pending.remove(msg_id)
958 self.pending.remove(msg_id)
959 self.all_completed.add(msg_id)
959 self.all_completed.add(msg_id)
960 try:
960 try:
961 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
961 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
962 except:
962 except:
963 content = error.wrap_exception()
963 content = error.wrap_exception()
964 # build a fake header:
964 # build a fake header:
965 header = {}
965 header = {}
966 header['engine'] = uuid
966 header['engine'] = uuid
967 header['date'] = datetime.now()
967 header['date'] = datetime.now()
968 rec = dict(result_content=content, result_header=header, result_buffers=[])
968 rec = dict(result_content=content, result_header=header, result_buffers=[])
969 rec['completed'] = header['date']
969 rec['completed'] = header['date']
970 rec['engine_uuid'] = uuid
970 rec['engine_uuid'] = uuid
971 try:
971 try:
972 self.db.update_record(msg_id, rec)
972 self.db.update_record(msg_id, rec)
973 except Exception:
973 except Exception:
974 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
974 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
975
975
976
976
977 def finish_registration(self, heart):
977 def finish_registration(self, heart):
978 """Second half of engine registration, called after our HeartMonitor
978 """Second half of engine registration, called after our HeartMonitor
979 has received a beat from the Engine's Heart."""
979 has received a beat from the Engine's Heart."""
980 try:
980 try:
981 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
981 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
982 except KeyError:
982 except KeyError:
983 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
983 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
984 return
984 return
985 self.log.info("registration::finished registering engine %i:%r", eid, queue)
985 self.log.info("registration::finished registering engine %i:%r", eid, queue)
986 if purge is not None:
986 if purge is not None:
987 purge.stop()
987 purge.stop()
988 control = queue
988 control = queue
989 self.ids.add(eid)
989 self.ids.add(eid)
990 self.keytable[eid] = queue
990 self.keytable[eid] = queue
991 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
991 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
992 control=control, heartbeat=heart)
992 control=control, heartbeat=heart)
993 self.by_ident[queue] = eid
993 self.by_ident[queue] = eid
994 self.queues[eid] = list()
994 self.queues[eid] = list()
995 self.tasks[eid] = list()
995 self.tasks[eid] = list()
996 self.completed[eid] = list()
996 self.completed[eid] = list()
997 self.hearts[heart] = eid
997 self.hearts[heart] = eid
998 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
998 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
999 if self.notifier:
999 if self.notifier:
1000 self.session.send(self.notifier, "registration_notification", content=content)
1000 self.session.send(self.notifier, "registration_notification", content=content)
1001 self.log.info("engine::Engine Connected: %i", eid)
1001 self.log.info("engine::Engine Connected: %i", eid)
1002
1002
1003 def _purge_stalled_registration(self, heart):
1003 def _purge_stalled_registration(self, heart):
1004 if heart in self.incoming_registrations:
1004 if heart in self.incoming_registrations:
1005 eid = self.incoming_registrations.pop(heart)[0]
1005 eid = self.incoming_registrations.pop(heart)[0]
1006 self.log.info("registration::purging stalled registration: %i", eid)
1006 self.log.info("registration::purging stalled registration: %i", eid)
1007 else:
1007 else:
1008 pass
1008 pass
1009
1009
1010 #-------------------------------------------------------------------------
1010 #-------------------------------------------------------------------------
1011 # Client Requests
1011 # Client Requests
1012 #-------------------------------------------------------------------------
1012 #-------------------------------------------------------------------------
1013
1013
1014 def shutdown_request(self, client_id, msg):
1014 def shutdown_request(self, client_id, msg):
1015 """handle shutdown request."""
1015 """handle shutdown request."""
1016 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1016 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1017 # also notify other clients of shutdown
1017 # also notify other clients of shutdown
1018 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1018 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1019 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1019 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1020 dc.start()
1020 dc.start()
1021
1021
1022 def _shutdown(self):
1022 def _shutdown(self):
1023 self.log.info("hub::hub shutting down.")
1023 self.log.info("hub::hub shutting down.")
1024 time.sleep(0.1)
1024 time.sleep(0.1)
1025 sys.exit(0)
1025 sys.exit(0)
1026
1026
1027
1027
1028 def check_load(self, client_id, msg):
1028 def check_load(self, client_id, msg):
1029 content = msg['content']
1029 content = msg['content']
1030 try:
1030 try:
1031 targets = content['targets']
1031 targets = content['targets']
1032 targets = self._validate_targets(targets)
1032 targets = self._validate_targets(targets)
1033 except:
1033 except:
1034 content = error.wrap_exception()
1034 content = error.wrap_exception()
1035 self.session.send(self.query, "hub_error",
1035 self.session.send(self.query, "hub_error",
1036 content=content, ident=client_id)
1036 content=content, ident=client_id)
1037 return
1037 return
1038
1038
1039 content = dict(status='ok')
1039 content = dict(status='ok')
1040 # loads = {}
1040 # loads = {}
1041 for t in targets:
1041 for t in targets:
1042 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1042 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1043 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1043 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1044
1044
1045
1045
1046 def queue_status(self, client_id, msg):
1046 def queue_status(self, client_id, msg):
1047 """Return the Queue status of one or more targets.
1047 """Return the Queue status of one or more targets.
1048 if verbose: return the msg_ids
1048 if verbose: return the msg_ids
1049 else: return len of each type.
1049 else: return len of each type.
1050 keys: queue (pending MUX jobs)
1050 keys: queue (pending MUX jobs)
1051 tasks (pending Task jobs)
1051 tasks (pending Task jobs)
1052 completed (finished jobs from both queues)"""
1052 completed (finished jobs from both queues)"""
1053 content = msg['content']
1053 content = msg['content']
1054 targets = content['targets']
1054 targets = content['targets']
1055 try:
1055 try:
1056 targets = self._validate_targets(targets)
1056 targets = self._validate_targets(targets)
1057 except:
1057 except:
1058 content = error.wrap_exception()
1058 content = error.wrap_exception()
1059 self.session.send(self.query, "hub_error",
1059 self.session.send(self.query, "hub_error",
1060 content=content, ident=client_id)
1060 content=content, ident=client_id)
1061 return
1061 return
1062 verbose = content.get('verbose', False)
1062 verbose = content.get('verbose', False)
1063 content = dict(status='ok')
1063 content = dict(status='ok')
1064 for t in targets:
1064 for t in targets:
1065 queue = self.queues[t]
1065 queue = self.queues[t]
1066 completed = self.completed[t]
1066 completed = self.completed[t]
1067 tasks = self.tasks[t]
1067 tasks = self.tasks[t]
1068 if not verbose:
1068 if not verbose:
1069 queue = len(queue)
1069 queue = len(queue)
1070 completed = len(completed)
1070 completed = len(completed)
1071 tasks = len(tasks)
1071 tasks = len(tasks)
1072 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1072 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1073 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1073 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1074 # print (content)
1074 # print (content)
1075 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1075 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1076
1076
1077 def purge_results(self, client_id, msg):
1077 def purge_results(self, client_id, msg):
1078 """Purge results from memory. This method is more valuable before we move
1078 """Purge results from memory. This method is more valuable before we move
1079 to a DB based message storage mechanism."""
1079 to a DB based message storage mechanism."""
1080 content = msg['content']
1080 content = msg['content']
1081 self.log.info("Dropping records with %s", content)
1081 self.log.info("Dropping records with %s", content)
1082 msg_ids = content.get('msg_ids', [])
1082 msg_ids = content.get('msg_ids', [])
1083 reply = dict(status='ok')
1083 reply = dict(status='ok')
1084 if msg_ids == 'all':
1084 if msg_ids == 'all':
1085 try:
1085 try:
1086 self.db.drop_matching_records(dict(completed={'$ne':None}))
1086 self.db.drop_matching_records(dict(completed={'$ne':None}))
1087 except Exception:
1087 except Exception:
1088 reply = error.wrap_exception()
1088 reply = error.wrap_exception()
1089 else:
1089 else:
1090 pending = filter(lambda m: m in self.pending, msg_ids)
1090 pending = filter(lambda m: m in self.pending, msg_ids)
1091 if pending:
1091 if pending:
1092 try:
1092 try:
1093 raise IndexError("msg pending: %r" % pending[0])
1093 raise IndexError("msg pending: %r" % pending[0])
1094 except:
1094 except:
1095 reply = error.wrap_exception()
1095 reply = error.wrap_exception()
1096 else:
1096 else:
1097 try:
1097 try:
1098 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1098 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1099 except Exception:
1099 except Exception:
1100 reply = error.wrap_exception()
1100 reply = error.wrap_exception()
1101
1101
1102 if reply['status'] == 'ok':
1102 if reply['status'] == 'ok':
1103 eids = content.get('engine_ids', [])
1103 eids = content.get('engine_ids', [])
1104 for eid in eids:
1104 for eid in eids:
1105 if eid not in self.engines:
1105 if eid not in self.engines:
1106 try:
1106 try:
1107 raise IndexError("No such engine: %i" % eid)
1107 raise IndexError("No such engine: %i" % eid)
1108 except:
1108 except:
1109 reply = error.wrap_exception()
1109 reply = error.wrap_exception()
1110 break
1110 break
1111 uid = self.engines[eid].queue
1111 uid = self.engines[eid].queue
1112 try:
1112 try:
1113 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1113 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1114 except Exception:
1114 except Exception:
1115 reply = error.wrap_exception()
1115 reply = error.wrap_exception()
1116 break
1116 break
1117
1117
1118 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1118 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1119
1119
1120 def resubmit_task(self, client_id, msg):
1120 def resubmit_task(self, client_id, msg):
1121 """Resubmit one or more tasks."""
1121 """Resubmit one or more tasks."""
1122 def finish(reply):
1122 def finish(reply):
1123 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1123 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1124
1124
1125 content = msg['content']
1125 content = msg['content']
1126 msg_ids = content['msg_ids']
1126 msg_ids = content['msg_ids']
1127 reply = dict(status='ok')
1127 reply = dict(status='ok')
1128 try:
1128 try:
1129 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1129 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1130 'header', 'content', 'buffers'])
1130 'header', 'content', 'buffers'])
1131 except Exception:
1131 except Exception:
1132 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1132 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1133 return finish(error.wrap_exception())
1133 return finish(error.wrap_exception())
1134
1134
1135 # validate msg_ids
1135 # validate msg_ids
1136 found_ids = [ rec['msg_id'] for rec in records ]
1136 found_ids = [ rec['msg_id'] for rec in records ]
1137 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1137 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1138 if len(records) > len(msg_ids):
1138 if len(records) > len(msg_ids):
1139 try:
1139 try:
1140 raise RuntimeError("DB appears to be in an inconsistent state."
1140 raise RuntimeError("DB appears to be in an inconsistent state."
1141 "More matching records were found than should exist")
1141 "More matching records were found than should exist")
1142 except Exception:
1142 except Exception:
1143 return finish(error.wrap_exception())
1143 return finish(error.wrap_exception())
1144 elif len(records) < len(msg_ids):
1144 elif len(records) < len(msg_ids):
1145 missing = [ m for m in msg_ids if m not in found_ids ]
1145 missing = [ m for m in msg_ids if m not in found_ids ]
1146 try:
1146 try:
1147 raise KeyError("No such msg(s): %r" % missing)
1147 raise KeyError("No such msg(s): %r" % missing)
1148 except KeyError:
1148 except KeyError:
1149 return finish(error.wrap_exception())
1149 return finish(error.wrap_exception())
1150 elif invalid_ids:
1150 elif pending_ids:
1151 msg_id = invalid_ids[0]
1151 pass
1152 # no need to raise on resubmit of pending task, now that we
1153 # resubmit under new ID, but do we want to raise anyway?
1154 # msg_id = invalid_ids[0]
1155 # try:
1156 # raise ValueError("Task(s) %r appears to be inflight" % )
1157 # except Exception:
1158 # return finish(error.wrap_exception())
1159
1160 # mapping of original IDs to resubmitted IDs
1161 resubmitted = {}
1162
1163 # send the messages
1164 for rec in records:
1165 header = rec['header']
1166 msg = self.session.msg(header['msg_type'])
1167 msg_id = msg['msg_id']
1168 msg['content'] = rec['content']
1169 header.update(msg['header'])
1170 msg['header'] = header
1171
1172 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1173
1174 resubmitted[rec['msg_id']] = msg_id
1175 self.pending.add(msg_id)
1176 msg['buffers'] = []
1152 try:
1177 try:
1153 raise ValueError("Task %r appears to be inflight" % msg_id)
1178 self.db.add_record(msg_id, init_record(msg))
1154 except Exception:
1179 except Exception:
1155 return finish(error.wrap_exception())
1180 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1156
1181
1157 # clear the existing records
1182 finish(dict(status='ok', resubmitted=resubmitted))
1158 now = datetime.now()
1183
1159 rec = empty_record()
1184 # store the new IDs in the Task DB
1160 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1185 for msg_id, resubmit_id in resubmitted.iteritems():
1161 rec['resubmitted'] = now
1186 try:
1162 rec['queue'] = 'task'
1187 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1163 rec['client_uuid'] = client_id[0]
1188 except Exception:
1164 try:
1189 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1165 for msg_id in msg_ids:
1166 self.all_completed.discard(msg_id)
1167 self.db.update_record(msg_id, rec)
1168 except Exception:
1169 self.log.error('db::db error upating record', exc_info=True)
1170 reply = error.wrap_exception()
1171 else:
1172 # send the messages
1173 for rec in records:
1174 header = rec['header']
1175 # include resubmitted in header to prevent digest collision
1176 header['resubmitted'] = now
1177 msg = self.session.msg(header['msg_type'])
1178 msg['content'] = rec['content']
1179 msg['header'] = header
1180 msg['header']['msg_id'] = rec['msg_id']
1181 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1182
1183 finish(dict(status='ok'))
1184
1190
1185
1191
1186 def _extract_record(self, rec):
1192 def _extract_record(self, rec):
1187 """decompose a TaskRecord dict into subsection of reply for get_result"""
1193 """decompose a TaskRecord dict into subsection of reply for get_result"""
1188 io_dict = {}
1194 io_dict = {}
1189 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1195 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1190 io_dict[key] = rec[key]
1196 io_dict[key] = rec[key]
1191 content = { 'result_content': rec['result_content'],
1197 content = { 'result_content': rec['result_content'],
1192 'header': rec['header'],
1198 'header': rec['header'],
1193 'result_header' : rec['result_header'],
1199 'result_header' : rec['result_header'],
1194 'received' : rec['received'],
1200 'received' : rec['received'],
1195 'io' : io_dict,
1201 'io' : io_dict,
1196 }
1202 }
1197 if rec['result_buffers']:
1203 if rec['result_buffers']:
1198 buffers = map(bytes, rec['result_buffers'])
1204 buffers = map(bytes, rec['result_buffers'])
1199 else:
1205 else:
1200 buffers = []
1206 buffers = []
1201
1207
1202 return content, buffers
1208 return content, buffers
1203
1209
1204 def get_results(self, client_id, msg):
1210 def get_results(self, client_id, msg):
1205 """Get the result of 1 or more messages."""
1211 """Get the result of 1 or more messages."""
1206 content = msg['content']
1212 content = msg['content']
1207 msg_ids = sorted(set(content['msg_ids']))
1213 msg_ids = sorted(set(content['msg_ids']))
1208 statusonly = content.get('status_only', False)
1214 statusonly = content.get('status_only', False)
1209 pending = []
1215 pending = []
1210 completed = []
1216 completed = []
1211 content = dict(status='ok')
1217 content = dict(status='ok')
1212 content['pending'] = pending
1218 content['pending'] = pending
1213 content['completed'] = completed
1219 content['completed'] = completed
1214 buffers = []
1220 buffers = []
1215 if not statusonly:
1221 if not statusonly:
1216 try:
1222 try:
1217 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1223 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1218 # turn match list into dict, for faster lookup
1224 # turn match list into dict, for faster lookup
1219 records = {}
1225 records = {}
1220 for rec in matches:
1226 for rec in matches:
1221 records[rec['msg_id']] = rec
1227 records[rec['msg_id']] = rec
1222 except Exception:
1228 except Exception:
1223 content = error.wrap_exception()
1229 content = error.wrap_exception()
1224 self.session.send(self.query, "result_reply", content=content,
1230 self.session.send(self.query, "result_reply", content=content,
1225 parent=msg, ident=client_id)
1231 parent=msg, ident=client_id)
1226 return
1232 return
1227 else:
1233 else:
1228 records = {}
1234 records = {}
1229 for msg_id in msg_ids:
1235 for msg_id in msg_ids:
1230 if msg_id in self.pending:
1236 if msg_id in self.pending:
1231 pending.append(msg_id)
1237 pending.append(msg_id)
1232 elif msg_id in self.all_completed:
1238 elif msg_id in self.all_completed:
1233 completed.append(msg_id)
1239 completed.append(msg_id)
1234 if not statusonly:
1240 if not statusonly:
1235 c,bufs = self._extract_record(records[msg_id])
1241 c,bufs = self._extract_record(records[msg_id])
1236 content[msg_id] = c
1242 content[msg_id] = c
1237 buffers.extend(bufs)
1243 buffers.extend(bufs)
1238 elif msg_id in records:
1244 elif msg_id in records:
1239 if rec['completed']:
1245 if rec['completed']:
1240 completed.append(msg_id)
1246 completed.append(msg_id)
1241 c,bufs = self._extract_record(records[msg_id])
1247 c,bufs = self._extract_record(records[msg_id])
1242 content[msg_id] = c
1248 content[msg_id] = c
1243 buffers.extend(bufs)
1249 buffers.extend(bufs)
1244 else:
1250 else:
1245 pending.append(msg_id)
1251 pending.append(msg_id)
1246 else:
1252 else:
1247 try:
1253 try:
1248 raise KeyError('No such message: '+msg_id)
1254 raise KeyError('No such message: '+msg_id)
1249 except:
1255 except:
1250 content = error.wrap_exception()
1256 content = error.wrap_exception()
1251 break
1257 break
1252 self.session.send(self.query, "result_reply", content=content,
1258 self.session.send(self.query, "result_reply", content=content,
1253 parent=msg, ident=client_id,
1259 parent=msg, ident=client_id,
1254 buffers=buffers)
1260 buffers=buffers)
1255
1261
1256 def get_history(self, client_id, msg):
1262 def get_history(self, client_id, msg):
1257 """Get a list of all msg_ids in our DB records"""
1263 """Get a list of all msg_ids in our DB records"""
1258 try:
1264 try:
1259 msg_ids = self.db.get_history()
1265 msg_ids = self.db.get_history()
1260 except Exception as e:
1266 except Exception as e:
1261 content = error.wrap_exception()
1267 content = error.wrap_exception()
1262 else:
1268 else:
1263 content = dict(status='ok', history=msg_ids)
1269 content = dict(status='ok', history=msg_ids)
1264
1270
1265 self.session.send(self.query, "history_reply", content=content,
1271 self.session.send(self.query, "history_reply", content=content,
1266 parent=msg, ident=client_id)
1272 parent=msg, ident=client_id)
1267
1273
1268 def db_query(self, client_id, msg):
1274 def db_query(self, client_id, msg):
1269 """Perform a raw query on the task record database."""
1275 """Perform a raw query on the task record database."""
1270 content = msg['content']
1276 content = msg['content']
1271 query = content.get('query', {})
1277 query = content.get('query', {})
1272 keys = content.get('keys', None)
1278 keys = content.get('keys', None)
1273 buffers = []
1279 buffers = []
1274 empty = list()
1280 empty = list()
1275 try:
1281 try:
1276 records = self.db.find_records(query, keys)
1282 records = self.db.find_records(query, keys)
1277 except Exception as e:
1283 except Exception as e:
1278 content = error.wrap_exception()
1284 content = error.wrap_exception()
1279 else:
1285 else:
1280 # extract buffers from reply content:
1286 # extract buffers from reply content:
1281 if keys is not None:
1287 if keys is not None:
1282 buffer_lens = [] if 'buffers' in keys else None
1288 buffer_lens = [] if 'buffers' in keys else None
1283 result_buffer_lens = [] if 'result_buffers' in keys else None
1289 result_buffer_lens = [] if 'result_buffers' in keys else None
1284 else:
1290 else:
1285 buffer_lens = None
1291 buffer_lens = None
1286 result_buffer_lens = None
1292 result_buffer_lens = None
1287
1293
1288 for rec in records:
1294 for rec in records:
1289 # buffers may be None, so double check
1295 # buffers may be None, so double check
1290 b = rec.pop('buffers', empty) or empty
1296 b = rec.pop('buffers', empty) or empty
1291 if buffer_lens is not None:
1297 if buffer_lens is not None:
1292 buffer_lens.append(len(b))
1298 buffer_lens.append(len(b))
1293 buffers.extend(b)
1299 buffers.extend(b)
1294 rb = rec.pop('result_buffers', empty) or empty
1300 rb = rec.pop('result_buffers', empty) or empty
1295 if result_buffer_lens is not None:
1301 if result_buffer_lens is not None:
1296 result_buffer_lens.append(len(rb))
1302 result_buffer_lens.append(len(rb))
1297 buffers.extend(rb)
1303 buffers.extend(rb)
1298 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1304 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1299 result_buffer_lens=result_buffer_lens)
1305 result_buffer_lens=result_buffer_lens)
1300 # self.log.debug (content)
1306 # self.log.debug (content)
1301 self.session.send(self.query, "db_reply", content=content,
1307 self.session.send(self.query, "db_reply", content=content,
1302 parent=msg, ident=client_id,
1308 parent=msg, ident=client_id,
1303 buffers=buffers)
1309 buffers=buffers)
1304
1310
@@ -1,412 +1,412 b''
1 """A TaskRecord backend using sqlite3
1 """A TaskRecord backend using sqlite3
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2011 The IPython Development Team
8 # Copyright (C) 2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 import json
14 import json
15 import os
15 import os
16 import cPickle as pickle
16 import cPickle as pickle
17 from datetime import datetime
17 from datetime import datetime
18
18
19 try:
19 try:
20 import sqlite3
20 import sqlite3
21 except ImportError:
21 except ImportError:
22 sqlite3 = None
22 sqlite3 = None
23
23
24 from zmq.eventloop import ioloop
24 from zmq.eventloop import ioloop
25
25
26 from IPython.utils.traitlets import Unicode, Instance, List, Dict
26 from IPython.utils.traitlets import Unicode, Instance, List, Dict
27 from .dictdb import BaseDB
27 from .dictdb import BaseDB
28 from IPython.utils.jsonutil import date_default, extract_dates, squash_dates
28 from IPython.utils.jsonutil import date_default, extract_dates, squash_dates
29
29
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31 # SQLite operators, adapters, and converters
31 # SQLite operators, adapters, and converters
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33
33
34 try:
34 try:
35 buffer
35 buffer
36 except NameError:
36 except NameError:
37 # py3k
37 # py3k
38 buffer = memoryview
38 buffer = memoryview
39
39
40 operators = {
40 operators = {
41 '$lt' : "<",
41 '$lt' : "<",
42 '$gt' : ">",
42 '$gt' : ">",
43 # null is handled weird with ==,!=
43 # null is handled weird with ==,!=
44 '$eq' : "=",
44 '$eq' : "=",
45 '$ne' : "!=",
45 '$ne' : "!=",
46 '$lte': "<=",
46 '$lte': "<=",
47 '$gte': ">=",
47 '$gte': ">=",
48 '$in' : ('=', ' OR '),
48 '$in' : ('=', ' OR '),
49 '$nin': ('!=', ' AND '),
49 '$nin': ('!=', ' AND '),
50 # '$all': None,
50 # '$all': None,
51 # '$mod': None,
51 # '$mod': None,
52 # '$exists' : None
52 # '$exists' : None
53 }
53 }
54 null_operators = {
54 null_operators = {
55 '=' : "IS NULL",
55 '=' : "IS NULL",
56 '!=' : "IS NOT NULL",
56 '!=' : "IS NOT NULL",
57 }
57 }
58
58
59 def _adapt_dict(d):
59 def _adapt_dict(d):
60 return json.dumps(d, default=date_default)
60 return json.dumps(d, default=date_default)
61
61
62 def _convert_dict(ds):
62 def _convert_dict(ds):
63 if ds is None:
63 if ds is None:
64 return ds
64 return ds
65 else:
65 else:
66 if isinstance(ds, bytes):
66 if isinstance(ds, bytes):
67 # If I understand the sqlite doc correctly, this will always be utf8
67 # If I understand the sqlite doc correctly, this will always be utf8
68 ds = ds.decode('utf8')
68 ds = ds.decode('utf8')
69 return extract_dates(json.loads(ds))
69 return extract_dates(json.loads(ds))
70
70
71 def _adapt_bufs(bufs):
71 def _adapt_bufs(bufs):
72 # this is *horrible*
72 # this is *horrible*
73 # copy buffers into single list and pickle it:
73 # copy buffers into single list and pickle it:
74 if bufs and isinstance(bufs[0], (bytes, buffer)):
74 if bufs and isinstance(bufs[0], (bytes, buffer)):
75 return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
75 return sqlite3.Binary(pickle.dumps(map(bytes, bufs),-1))
76 elif bufs:
76 elif bufs:
77 return bufs
77 return bufs
78 else:
78 else:
79 return None
79 return None
80
80
81 def _convert_bufs(bs):
81 def _convert_bufs(bs):
82 if bs is None:
82 if bs is None:
83 return []
83 return []
84 else:
84 else:
85 return pickle.loads(bytes(bs))
85 return pickle.loads(bytes(bs))
86
86
87 #-----------------------------------------------------------------------------
87 #-----------------------------------------------------------------------------
88 # SQLiteDB class
88 # SQLiteDB class
89 #-----------------------------------------------------------------------------
89 #-----------------------------------------------------------------------------
90
90
91 class SQLiteDB(BaseDB):
91 class SQLiteDB(BaseDB):
92 """SQLite3 TaskRecord backend."""
92 """SQLite3 TaskRecord backend."""
93
93
94 filename = Unicode('tasks.db', config=True,
94 filename = Unicode('tasks.db', config=True,
95 help="""The filename of the sqlite task database. [default: 'tasks.db']""")
95 help="""The filename of the sqlite task database. [default: 'tasks.db']""")
96 location = Unicode('', config=True,
96 location = Unicode('', config=True,
97 help="""The directory containing the sqlite task database. The default
97 help="""The directory containing the sqlite task database. The default
98 is to use the cluster_dir location.""")
98 is to use the cluster_dir location.""")
99 table = Unicode("", config=True,
99 table = Unicode("", config=True,
100 help="""The SQLite Table to use for storing tasks for this session. If unspecified,
100 help="""The SQLite Table to use for storing tasks for this session. If unspecified,
101 a new table will be created with the Hub's IDENT. Specifying the table will result
101 a new table will be created with the Hub's IDENT. Specifying the table will result
102 in tasks from previous sessions being available via Clients' db_query and
102 in tasks from previous sessions being available via Clients' db_query and
103 get_result methods.""")
103 get_result methods.""")
104
104
105 if sqlite3 is not None:
105 if sqlite3 is not None:
106 _db = Instance('sqlite3.Connection')
106 _db = Instance('sqlite3.Connection')
107 else:
107 else:
108 _db = None
108 _db = None
109 # the ordered list of column names
109 # the ordered list of column names
110 _keys = List(['msg_id' ,
110 _keys = List(['msg_id' ,
111 'header' ,
111 'header' ,
112 'content',
112 'content',
113 'buffers',
113 'buffers',
114 'submitted',
114 'submitted',
115 'client_uuid' ,
115 'client_uuid' ,
116 'engine_uuid' ,
116 'engine_uuid' ,
117 'started',
117 'started',
118 'completed',
118 'completed',
119 'resubmitted',
119 'resubmitted',
120 'received',
120 'received',
121 'result_header' ,
121 'result_header' ,
122 'result_content' ,
122 'result_content' ,
123 'result_buffers' ,
123 'result_buffers' ,
124 'queue' ,
124 'queue' ,
125 'pyin' ,
125 'pyin' ,
126 'pyout',
126 'pyout',
127 'pyerr',
127 'pyerr',
128 'stdout',
128 'stdout',
129 'stderr',
129 'stderr',
130 ])
130 ])
131 # sqlite datatypes for checking that db is current format
131 # sqlite datatypes for checking that db is current format
132 _types = Dict({'msg_id' : 'text' ,
132 _types = Dict({'msg_id' : 'text' ,
133 'header' : 'dict text',
133 'header' : 'dict text',
134 'content' : 'dict text',
134 'content' : 'dict text',
135 'buffers' : 'bufs blob',
135 'buffers' : 'bufs blob',
136 'submitted' : 'timestamp',
136 'submitted' : 'timestamp',
137 'client_uuid' : 'text',
137 'client_uuid' : 'text',
138 'engine_uuid' : 'text',
138 'engine_uuid' : 'text',
139 'started' : 'timestamp',
139 'started' : 'timestamp',
140 'completed' : 'timestamp',
140 'completed' : 'timestamp',
141 'resubmitted' : 'timestamp',
141 'resubmitted' : 'text',
142 'received' : 'timestamp',
142 'received' : 'timestamp',
143 'result_header' : 'dict text',
143 'result_header' : 'dict text',
144 'result_content' : 'dict text',
144 'result_content' : 'dict text',
145 'result_buffers' : 'bufs blob',
145 'result_buffers' : 'bufs blob',
146 'queue' : 'text',
146 'queue' : 'text',
147 'pyin' : 'text',
147 'pyin' : 'text',
148 'pyout' : 'text',
148 'pyout' : 'text',
149 'pyerr' : 'text',
149 'pyerr' : 'text',
150 'stdout' : 'text',
150 'stdout' : 'text',
151 'stderr' : 'text',
151 'stderr' : 'text',
152 })
152 })
153
153
154 def __init__(self, **kwargs):
154 def __init__(self, **kwargs):
155 super(SQLiteDB, self).__init__(**kwargs)
155 super(SQLiteDB, self).__init__(**kwargs)
156 if sqlite3 is None:
156 if sqlite3 is None:
157 raise ImportError("SQLiteDB requires sqlite3")
157 raise ImportError("SQLiteDB requires sqlite3")
158 if not self.table:
158 if not self.table:
159 # use session, and prefix _, since starting with # is illegal
159 # use session, and prefix _, since starting with # is illegal
160 self.table = '_'+self.session.replace('-','_')
160 self.table = '_'+self.session.replace('-','_')
161 if not self.location:
161 if not self.location:
162 # get current profile
162 # get current profile
163 from IPython.core.application import BaseIPythonApplication
163 from IPython.core.application import BaseIPythonApplication
164 if BaseIPythonApplication.initialized():
164 if BaseIPythonApplication.initialized():
165 app = BaseIPythonApplication.instance()
165 app = BaseIPythonApplication.instance()
166 if app.profile_dir is not None:
166 if app.profile_dir is not None:
167 self.location = app.profile_dir.location
167 self.location = app.profile_dir.location
168 else:
168 else:
169 self.location = u'.'
169 self.location = u'.'
170 else:
170 else:
171 self.location = u'.'
171 self.location = u'.'
172 self._init_db()
172 self._init_db()
173
173
174 # register db commit as 2s periodic callback
174 # register db commit as 2s periodic callback
175 # to prevent clogging pipes
175 # to prevent clogging pipes
176 # assumes we are being run in a zmq ioloop app
176 # assumes we are being run in a zmq ioloop app
177 loop = ioloop.IOLoop.instance()
177 loop = ioloop.IOLoop.instance()
178 pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
178 pc = ioloop.PeriodicCallback(self._db.commit, 2000, loop)
179 pc.start()
179 pc.start()
180
180
181 def _defaults(self, keys=None):
181 def _defaults(self, keys=None):
182 """create an empty record"""
182 """create an empty record"""
183 d = {}
183 d = {}
184 keys = self._keys if keys is None else keys
184 keys = self._keys if keys is None else keys
185 for key in keys:
185 for key in keys:
186 d[key] = None
186 d[key] = None
187 return d
187 return d
188
188
189 def _check_table(self):
189 def _check_table(self):
190 """Ensure that an incorrect table doesn't exist
190 """Ensure that an incorrect table doesn't exist
191
191
192 If a bad (old) table does exist, return False
192 If a bad (old) table does exist, return False
193 """
193 """
194 cursor = self._db.execute("PRAGMA table_info(%s)"%self.table)
194 cursor = self._db.execute("PRAGMA table_info(%s)"%self.table)
195 lines = cursor.fetchall()
195 lines = cursor.fetchall()
196 if not lines:
196 if not lines:
197 # table does not exist
197 # table does not exist
198 return True
198 return True
199 types = {}
199 types = {}
200 keys = []
200 keys = []
201 for line in lines:
201 for line in lines:
202 keys.append(line[1])
202 keys.append(line[1])
203 types[line[1]] = line[2]
203 types[line[1]] = line[2]
204 if self._keys != keys:
204 if self._keys != keys:
205 # key mismatch
205 # key mismatch
206 self.log.warn('keys mismatch')
206 self.log.warn('keys mismatch')
207 return False
207 return False
208 for key in self._keys:
208 for key in self._keys:
209 if types[key] != self._types[key]:
209 if types[key] != self._types[key]:
210 self.log.warn(
210 self.log.warn(
211 'type mismatch: %s: %s != %s'%(key,types[key],self._types[key])
211 'type mismatch: %s: %s != %s'%(key,types[key],self._types[key])
212 )
212 )
213 return False
213 return False
214 return True
214 return True
215
215
216 def _init_db(self):
216 def _init_db(self):
217 """Connect to the database and get new session number."""
217 """Connect to the database and get new session number."""
218 # register adapters
218 # register adapters
219 sqlite3.register_adapter(dict, _adapt_dict)
219 sqlite3.register_adapter(dict, _adapt_dict)
220 sqlite3.register_converter('dict', _convert_dict)
220 sqlite3.register_converter('dict', _convert_dict)
221 sqlite3.register_adapter(list, _adapt_bufs)
221 sqlite3.register_adapter(list, _adapt_bufs)
222 sqlite3.register_converter('bufs', _convert_bufs)
222 sqlite3.register_converter('bufs', _convert_bufs)
223 # connect to the db
223 # connect to the db
224 dbfile = os.path.join(self.location, self.filename)
224 dbfile = os.path.join(self.location, self.filename)
225 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES,
225 self._db = sqlite3.connect(dbfile, detect_types=sqlite3.PARSE_DECLTYPES,
226 # isolation_level = None)#,
226 # isolation_level = None)#,
227 cached_statements=64)
227 cached_statements=64)
228 # print dir(self._db)
228 # print dir(self._db)
229 first_table = previous_table = self.table
229 first_table = previous_table = self.table
230 i=0
230 i=0
231 while not self._check_table():
231 while not self._check_table():
232 i+=1
232 i+=1
233 self.table = first_table+'_%i'%i
233 self.table = first_table+'_%i'%i
234 self.log.warn(
234 self.log.warn(
235 "Table %s exists and doesn't match db format, trying %s"%
235 "Table %s exists and doesn't match db format, trying %s"%
236 (previous_table, self.table)
236 (previous_table, self.table)
237 )
237 )
238 previous_table = self.table
238 previous_table = self.table
239
239
240 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
240 self._db.execute("""CREATE TABLE IF NOT EXISTS %s
241 (msg_id text PRIMARY KEY,
241 (msg_id text PRIMARY KEY,
242 header dict text,
242 header dict text,
243 content dict text,
243 content dict text,
244 buffers bufs blob,
244 buffers bufs blob,
245 submitted timestamp,
245 submitted timestamp,
246 client_uuid text,
246 client_uuid text,
247 engine_uuid text,
247 engine_uuid text,
248 started timestamp,
248 started timestamp,
249 completed timestamp,
249 completed timestamp,
250 resubmitted timestamp,
250 resubmitted text,
251 received timestamp,
251 received timestamp,
252 result_header dict text,
252 result_header dict text,
253 result_content dict text,
253 result_content dict text,
254 result_buffers bufs blob,
254 result_buffers bufs blob,
255 queue text,
255 queue text,
256 pyin text,
256 pyin text,
257 pyout text,
257 pyout text,
258 pyerr text,
258 pyerr text,
259 stdout text,
259 stdout text,
260 stderr text)
260 stderr text)
261 """%self.table)
261 """%self.table)
262 self._db.commit()
262 self._db.commit()
263
263
264 def _dict_to_list(self, d):
264 def _dict_to_list(self, d):
265 """turn a mongodb-style record dict into a list."""
265 """turn a mongodb-style record dict into a list."""
266
266
267 return [ d[key] for key in self._keys ]
267 return [ d[key] for key in self._keys ]
268
268
269 def _list_to_dict(self, line, keys=None):
269 def _list_to_dict(self, line, keys=None):
270 """Inverse of dict_to_list"""
270 """Inverse of dict_to_list"""
271 keys = self._keys if keys is None else keys
271 keys = self._keys if keys is None else keys
272 d = self._defaults(keys)
272 d = self._defaults(keys)
273 for key,value in zip(keys, line):
273 for key,value in zip(keys, line):
274 d[key] = value
274 d[key] = value
275
275
276 return d
276 return d
277
277
278 def _render_expression(self, check):
278 def _render_expression(self, check):
279 """Turn a mongodb-style search dict into an SQL query."""
279 """Turn a mongodb-style search dict into an SQL query."""
280 expressions = []
280 expressions = []
281 args = []
281 args = []
282
282
283 skeys = set(check.keys())
283 skeys = set(check.keys())
284 skeys.difference_update(set(self._keys))
284 skeys.difference_update(set(self._keys))
285 skeys.difference_update(set(['buffers', 'result_buffers']))
285 skeys.difference_update(set(['buffers', 'result_buffers']))
286 if skeys:
286 if skeys:
287 raise KeyError("Illegal testing key(s): %s"%skeys)
287 raise KeyError("Illegal testing key(s): %s"%skeys)
288
288
289 for name,sub_check in check.iteritems():
289 for name,sub_check in check.iteritems():
290 if isinstance(sub_check, dict):
290 if isinstance(sub_check, dict):
291 for test,value in sub_check.iteritems():
291 for test,value in sub_check.iteritems():
292 try:
292 try:
293 op = operators[test]
293 op = operators[test]
294 except KeyError:
294 except KeyError:
295 raise KeyError("Unsupported operator: %r"%test)
295 raise KeyError("Unsupported operator: %r"%test)
296 if isinstance(op, tuple):
296 if isinstance(op, tuple):
297 op, join = op
297 op, join = op
298
298
299 if value is None and op in null_operators:
299 if value is None and op in null_operators:
300 expr = "%s %s" % (name, null_operators[op])
300 expr = "%s %s" % (name, null_operators[op])
301 else:
301 else:
302 expr = "%s %s ?"%(name, op)
302 expr = "%s %s ?"%(name, op)
303 if isinstance(value, (tuple,list)):
303 if isinstance(value, (tuple,list)):
304 if op in null_operators and any([v is None for v in value]):
304 if op in null_operators and any([v is None for v in value]):
305 # equality tests don't work with NULL
305 # equality tests don't work with NULL
306 raise ValueError("Cannot use %r test with NULL values on SQLite backend"%test)
306 raise ValueError("Cannot use %r test with NULL values on SQLite backend"%test)
307 expr = '( %s )'%( join.join([expr]*len(value)) )
307 expr = '( %s )'%( join.join([expr]*len(value)) )
308 args.extend(value)
308 args.extend(value)
309 else:
309 else:
310 args.append(value)
310 args.append(value)
311 expressions.append(expr)
311 expressions.append(expr)
312 else:
312 else:
313 # it's an equality check
313 # it's an equality check
314 if sub_check is None:
314 if sub_check is None:
315 expressions.append("%s IS NULL" % name)
315 expressions.append("%s IS NULL" % name)
316 else:
316 else:
317 expressions.append("%s = ?"%name)
317 expressions.append("%s = ?"%name)
318 args.append(sub_check)
318 args.append(sub_check)
319
319
320 expr = " AND ".join(expressions)
320 expr = " AND ".join(expressions)
321 return expr, args
321 return expr, args
322
322
323 def add_record(self, msg_id, rec):
323 def add_record(self, msg_id, rec):
324 """Add a new Task Record, by msg_id."""
324 """Add a new Task Record, by msg_id."""
325 d = self._defaults()
325 d = self._defaults()
326 d.update(rec)
326 d.update(rec)
327 d['msg_id'] = msg_id
327 d['msg_id'] = msg_id
328 line = self._dict_to_list(d)
328 line = self._dict_to_list(d)
329 tups = '(%s)'%(','.join(['?']*len(line)))
329 tups = '(%s)'%(','.join(['?']*len(line)))
330 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
330 self._db.execute("INSERT INTO %s VALUES %s"%(self.table, tups), line)
331 # self._db.commit()
331 # self._db.commit()
332
332
333 def get_record(self, msg_id):
333 def get_record(self, msg_id):
334 """Get a specific Task Record, by msg_id."""
334 """Get a specific Task Record, by msg_id."""
335 cursor = self._db.execute("""SELECT * FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
335 cursor = self._db.execute("""SELECT * FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
336 line = cursor.fetchone()
336 line = cursor.fetchone()
337 if line is None:
337 if line is None:
338 raise KeyError("No such msg: %r"%msg_id)
338 raise KeyError("No such msg: %r"%msg_id)
339 return self._list_to_dict(line)
339 return self._list_to_dict(line)
340
340
341 def update_record(self, msg_id, rec):
341 def update_record(self, msg_id, rec):
342 """Update the data in an existing record."""
342 """Update the data in an existing record."""
343 query = "UPDATE %s SET "%self.table
343 query = "UPDATE %s SET "%self.table
344 sets = []
344 sets = []
345 keys = sorted(rec.keys())
345 keys = sorted(rec.keys())
346 values = []
346 values = []
347 for key in keys:
347 for key in keys:
348 sets.append('%s = ?'%key)
348 sets.append('%s = ?'%key)
349 values.append(rec[key])
349 values.append(rec[key])
350 query += ', '.join(sets)
350 query += ', '.join(sets)
351 query += ' WHERE msg_id == ?'
351 query += ' WHERE msg_id == ?'
352 values.append(msg_id)
352 values.append(msg_id)
353 self._db.execute(query, values)
353 self._db.execute(query, values)
354 # self._db.commit()
354 # self._db.commit()
355
355
356 def drop_record(self, msg_id):
356 def drop_record(self, msg_id):
357 """Remove a record from the DB."""
357 """Remove a record from the DB."""
358 self._db.execute("""DELETE FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
358 self._db.execute("""DELETE FROM %s WHERE msg_id==?"""%self.table, (msg_id,))
359 # self._db.commit()
359 # self._db.commit()
360
360
361 def drop_matching_records(self, check):
361 def drop_matching_records(self, check):
362 """Remove a record from the DB."""
362 """Remove a record from the DB."""
363 expr,args = self._render_expression(check)
363 expr,args = self._render_expression(check)
364 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
364 query = "DELETE FROM %s WHERE %s"%(self.table, expr)
365 self._db.execute(query,args)
365 self._db.execute(query,args)
366 # self._db.commit()
366 # self._db.commit()
367
367
368 def find_records(self, check, keys=None):
368 def find_records(self, check, keys=None):
369 """Find records matching a query dict, optionally extracting subset of keys.
369 """Find records matching a query dict, optionally extracting subset of keys.
370
370
371 Returns list of matching records.
371 Returns list of matching records.
372
372
373 Parameters
373 Parameters
374 ----------
374 ----------
375
375
376 check: dict
376 check: dict
377 mongodb-style query argument
377 mongodb-style query argument
378 keys: list of strs [optional]
378 keys: list of strs [optional]
379 if specified, the subset of keys to extract. msg_id will *always* be
379 if specified, the subset of keys to extract. msg_id will *always* be
380 included.
380 included.
381 """
381 """
382 if keys:
382 if keys:
383 bad_keys = [ key for key in keys if key not in self._keys ]
383 bad_keys = [ key for key in keys if key not in self._keys ]
384 if bad_keys:
384 if bad_keys:
385 raise KeyError("Bad record key(s): %s"%bad_keys)
385 raise KeyError("Bad record key(s): %s"%bad_keys)
386
386
387 if keys:
387 if keys:
388 # ensure msg_id is present and first:
388 # ensure msg_id is present and first:
389 if 'msg_id' in keys:
389 if 'msg_id' in keys:
390 keys.remove('msg_id')
390 keys.remove('msg_id')
391 keys.insert(0, 'msg_id')
391 keys.insert(0, 'msg_id')
392 req = ', '.join(keys)
392 req = ', '.join(keys)
393 else:
393 else:
394 req = '*'
394 req = '*'
395 expr,args = self._render_expression(check)
395 expr,args = self._render_expression(check)
396 query = """SELECT %s FROM %s WHERE %s"""%(req, self.table, expr)
396 query = """SELECT %s FROM %s WHERE %s"""%(req, self.table, expr)
397 cursor = self._db.execute(query, args)
397 cursor = self._db.execute(query, args)
398 matches = cursor.fetchall()
398 matches = cursor.fetchall()
399 records = []
399 records = []
400 for line in matches:
400 for line in matches:
401 rec = self._list_to_dict(line, keys)
401 rec = self._list_to_dict(line, keys)
402 records.append(rec)
402 records.append(rec)
403 return records
403 return records
404
404
405 def get_history(self):
405 def get_history(self):
406 """get all msg_ids, ordered by time submitted."""
406 """get all msg_ids, ordered by time submitted."""
407 query = """SELECT msg_id FROM %s ORDER by submitted ASC"""%self.table
407 query = """SELECT msg_id FROM %s ORDER by submitted ASC"""%self.table
408 cursor = self._db.execute(query)
408 cursor = self._db.execute(query)
409 # will be a list of length 1 tuples
409 # will be a list of length 1 tuples
410 return [ tup[0] for tup in cursor.fetchall()]
410 return [ tup[0] for tup in cursor.fetchall()]
411
411
412 __all__ = ['SQLiteDB'] No newline at end of file
412 __all__ = ['SQLiteDB']
@@ -1,386 +1,387 b''
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23 from tempfile import mktemp
23 from tempfile import mktemp
24
24
25 import zmq
25 import zmq
26
26
27 from IPython.parallel.client import client as clientmod
27 from IPython.parallel.client import client as clientmod
28 from IPython.parallel import error
28 from IPython.parallel import error
29 from IPython.parallel import AsyncResult, AsyncHubResult
29 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import LoadBalancedView, DirectView
30 from IPython.parallel import LoadBalancedView, DirectView
31
31
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
32 from clienttest import ClusterTestCase, segfault, wait, add_engines
33
33
34 def setup():
34 def setup():
35 add_engines(4, total=True)
35 add_engines(4, total=True)
36
36
37 class TestClient(ClusterTestCase):
37 class TestClient(ClusterTestCase):
38
38
39 def test_ids(self):
39 def test_ids(self):
40 n = len(self.client.ids)
40 n = len(self.client.ids)
41 self.add_engines(2)
41 self.add_engines(2)
42 self.assertEquals(len(self.client.ids), n+2)
42 self.assertEquals(len(self.client.ids), n+2)
43
43
44 def test_view_indexing(self):
44 def test_view_indexing(self):
45 """test index access for views"""
45 """test index access for views"""
46 self.minimum_engines(4)
46 self.minimum_engines(4)
47 targets = self.client._build_targets('all')[-1]
47 targets = self.client._build_targets('all')[-1]
48 v = self.client[:]
48 v = self.client[:]
49 self.assertEquals(v.targets, targets)
49 self.assertEquals(v.targets, targets)
50 t = self.client.ids[2]
50 t = self.client.ids[2]
51 v = self.client[t]
51 v = self.client[t]
52 self.assert_(isinstance(v, DirectView))
52 self.assert_(isinstance(v, DirectView))
53 self.assertEquals(v.targets, t)
53 self.assertEquals(v.targets, t)
54 t = self.client.ids[2:4]
54 t = self.client.ids[2:4]
55 v = self.client[t]
55 v = self.client[t]
56 self.assert_(isinstance(v, DirectView))
56 self.assert_(isinstance(v, DirectView))
57 self.assertEquals(v.targets, t)
57 self.assertEquals(v.targets, t)
58 v = self.client[::2]
58 v = self.client[::2]
59 self.assert_(isinstance(v, DirectView))
59 self.assert_(isinstance(v, DirectView))
60 self.assertEquals(v.targets, targets[::2])
60 self.assertEquals(v.targets, targets[::2])
61 v = self.client[1::3]
61 v = self.client[1::3]
62 self.assert_(isinstance(v, DirectView))
62 self.assert_(isinstance(v, DirectView))
63 self.assertEquals(v.targets, targets[1::3])
63 self.assertEquals(v.targets, targets[1::3])
64 v = self.client[:-3]
64 v = self.client[:-3]
65 self.assert_(isinstance(v, DirectView))
65 self.assert_(isinstance(v, DirectView))
66 self.assertEquals(v.targets, targets[:-3])
66 self.assertEquals(v.targets, targets[:-3])
67 v = self.client[-1]
67 v = self.client[-1]
68 self.assert_(isinstance(v, DirectView))
68 self.assert_(isinstance(v, DirectView))
69 self.assertEquals(v.targets, targets[-1])
69 self.assertEquals(v.targets, targets[-1])
70 self.assertRaises(TypeError, lambda : self.client[None])
70 self.assertRaises(TypeError, lambda : self.client[None])
71
71
72 def test_lbview_targets(self):
72 def test_lbview_targets(self):
73 """test load_balanced_view targets"""
73 """test load_balanced_view targets"""
74 v = self.client.load_balanced_view()
74 v = self.client.load_balanced_view()
75 self.assertEquals(v.targets, None)
75 self.assertEquals(v.targets, None)
76 v = self.client.load_balanced_view(-1)
76 v = self.client.load_balanced_view(-1)
77 self.assertEquals(v.targets, [self.client.ids[-1]])
77 self.assertEquals(v.targets, [self.client.ids[-1]])
78 v = self.client.load_balanced_view('all')
78 v = self.client.load_balanced_view('all')
79 self.assertEquals(v.targets, None)
79 self.assertEquals(v.targets, None)
80
80
81 def test_dview_targets(self):
81 def test_dview_targets(self):
82 """test direct_view targets"""
82 """test direct_view targets"""
83 v = self.client.direct_view()
83 v = self.client.direct_view()
84 self.assertEquals(v.targets, 'all')
84 self.assertEquals(v.targets, 'all')
85 v = self.client.direct_view('all')
85 v = self.client.direct_view('all')
86 self.assertEquals(v.targets, 'all')
86 self.assertEquals(v.targets, 'all')
87 v = self.client.direct_view(-1)
87 v = self.client.direct_view(-1)
88 self.assertEquals(v.targets, self.client.ids[-1])
88 self.assertEquals(v.targets, self.client.ids[-1])
89
89
90 def test_lazy_all_targets(self):
90 def test_lazy_all_targets(self):
91 """test lazy evaluation of rc.direct_view('all')"""
91 """test lazy evaluation of rc.direct_view('all')"""
92 v = self.client.direct_view()
92 v = self.client.direct_view()
93 self.assertEquals(v.targets, 'all')
93 self.assertEquals(v.targets, 'all')
94
94
95 def double(x):
95 def double(x):
96 return x*2
96 return x*2
97 seq = range(100)
97 seq = range(100)
98 ref = [ double(x) for x in seq ]
98 ref = [ double(x) for x in seq ]
99
99
100 # add some engines, which should be used
100 # add some engines, which should be used
101 self.add_engines(1)
101 self.add_engines(1)
102 n1 = len(self.client.ids)
102 n1 = len(self.client.ids)
103
103
104 # simple apply
104 # simple apply
105 r = v.apply_sync(lambda : 1)
105 r = v.apply_sync(lambda : 1)
106 self.assertEquals(r, [1] * n1)
106 self.assertEquals(r, [1] * n1)
107
107
108 # map goes through remotefunction
108 # map goes through remotefunction
109 r = v.map_sync(double, seq)
109 r = v.map_sync(double, seq)
110 self.assertEquals(r, ref)
110 self.assertEquals(r, ref)
111
111
112 # add a couple more engines, and try again
112 # add a couple more engines, and try again
113 self.add_engines(2)
113 self.add_engines(2)
114 n2 = len(self.client.ids)
114 n2 = len(self.client.ids)
115 self.assertNotEquals(n2, n1)
115 self.assertNotEquals(n2, n1)
116
116
117 # apply
117 # apply
118 r = v.apply_sync(lambda : 1)
118 r = v.apply_sync(lambda : 1)
119 self.assertEquals(r, [1] * n2)
119 self.assertEquals(r, [1] * n2)
120
120
121 # map
121 # map
122 r = v.map_sync(double, seq)
122 r = v.map_sync(double, seq)
123 self.assertEquals(r, ref)
123 self.assertEquals(r, ref)
124
124
125 def test_targets(self):
125 def test_targets(self):
126 """test various valid targets arguments"""
126 """test various valid targets arguments"""
127 build = self.client._build_targets
127 build = self.client._build_targets
128 ids = self.client.ids
128 ids = self.client.ids
129 idents,targets = build(None)
129 idents,targets = build(None)
130 self.assertEquals(ids, targets)
130 self.assertEquals(ids, targets)
131
131
132 def test_clear(self):
132 def test_clear(self):
133 """test clear behavior"""
133 """test clear behavior"""
134 self.minimum_engines(2)
134 self.minimum_engines(2)
135 v = self.client[:]
135 v = self.client[:]
136 v.block=True
136 v.block=True
137 v.push(dict(a=5))
137 v.push(dict(a=5))
138 v.pull('a')
138 v.pull('a')
139 id0 = self.client.ids[-1]
139 id0 = self.client.ids[-1]
140 self.client.clear(targets=id0, block=True)
140 self.client.clear(targets=id0, block=True)
141 a = self.client[:-1].get('a')
141 a = self.client[:-1].get('a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
142 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.client.clear(block=True)
143 self.client.clear(block=True)
144 for i in self.client.ids:
144 for i in self.client.ids:
145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
145 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146
146
147 def test_get_result(self):
147 def test_get_result(self):
148 """test getting results from the Hub."""
148 """test getting results from the Hub."""
149 c = clientmod.Client(profile='iptest')
149 c = clientmod.Client(profile='iptest')
150 t = c.ids[-1]
150 t = c.ids[-1]
151 ar = c[t].apply_async(wait, 1)
151 ar = c[t].apply_async(wait, 1)
152 # give the monitor time to notice the message
152 # give the monitor time to notice the message
153 time.sleep(.25)
153 time.sleep(.25)
154 ahr = self.client.get_result(ar.msg_ids)
154 ahr = self.client.get_result(ar.msg_ids)
155 self.assertTrue(isinstance(ahr, AsyncHubResult))
155 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertEquals(ahr.get(), ar.get())
156 self.assertEquals(ahr.get(), ar.get())
157 ar2 = self.client.get_result(ar.msg_ids)
157 ar2 = self.client.get_result(ar.msg_ids)
158 self.assertFalse(isinstance(ar2, AsyncHubResult))
158 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 c.close()
159 c.close()
160
160
161 def test_ids_list(self):
161 def test_ids_list(self):
162 """test client.ids"""
162 """test client.ids"""
163 ids = self.client.ids
163 ids = self.client.ids
164 self.assertEquals(ids, self.client._ids)
164 self.assertEquals(ids, self.client._ids)
165 self.assertFalse(ids is self.client._ids)
165 self.assertFalse(ids is self.client._ids)
166 ids.remove(ids[-1])
166 ids.remove(ids[-1])
167 self.assertNotEquals(ids, self.client._ids)
167 self.assertNotEquals(ids, self.client._ids)
168
168
169 def test_queue_status(self):
169 def test_queue_status(self):
170 ids = self.client.ids
170 ids = self.client.ids
171 id0 = ids[0]
171 id0 = ids[0]
172 qs = self.client.queue_status(targets=id0)
172 qs = self.client.queue_status(targets=id0)
173 self.assertTrue(isinstance(qs, dict))
173 self.assertTrue(isinstance(qs, dict))
174 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
174 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
175 allqs = self.client.queue_status()
175 allqs = self.client.queue_status()
176 self.assertTrue(isinstance(allqs, dict))
176 self.assertTrue(isinstance(allqs, dict))
177 intkeys = list(allqs.keys())
177 intkeys = list(allqs.keys())
178 intkeys.remove('unassigned')
178 intkeys.remove('unassigned')
179 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
179 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
180 unassigned = allqs.pop('unassigned')
180 unassigned = allqs.pop('unassigned')
181 for eid,qs in allqs.items():
181 for eid,qs in allqs.items():
182 self.assertTrue(isinstance(qs, dict))
182 self.assertTrue(isinstance(qs, dict))
183 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
183 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
184
184
185 def test_shutdown(self):
185 def test_shutdown(self):
186 ids = self.client.ids
186 ids = self.client.ids
187 id0 = ids[0]
187 id0 = ids[0]
188 self.client.shutdown(id0, block=True)
188 self.client.shutdown(id0, block=True)
189 while id0 in self.client.ids:
189 while id0 in self.client.ids:
190 time.sleep(0.1)
190 time.sleep(0.1)
191 self.client.spin()
191 self.client.spin()
192
192
193 self.assertRaises(IndexError, lambda : self.client[id0])
193 self.assertRaises(IndexError, lambda : self.client[id0])
194
194
195 def test_result_status(self):
195 def test_result_status(self):
196 pass
196 pass
197 # to be written
197 # to be written
198
198
199 def test_db_query_dt(self):
199 def test_db_query_dt(self):
200 """test db query by date"""
200 """test db query by date"""
201 hist = self.client.hub_history()
201 hist = self.client.hub_history()
202 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
202 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
203 tic = middle['submitted']
203 tic = middle['submitted']
204 before = self.client.db_query({'submitted' : {'$lt' : tic}})
204 before = self.client.db_query({'submitted' : {'$lt' : tic}})
205 after = self.client.db_query({'submitted' : {'$gte' : tic}})
205 after = self.client.db_query({'submitted' : {'$gte' : tic}})
206 self.assertEquals(len(before)+len(after),len(hist))
206 self.assertEquals(len(before)+len(after),len(hist))
207 for b in before:
207 for b in before:
208 self.assertTrue(b['submitted'] < tic)
208 self.assertTrue(b['submitted'] < tic)
209 for a in after:
209 for a in after:
210 self.assertTrue(a['submitted'] >= tic)
210 self.assertTrue(a['submitted'] >= tic)
211 same = self.client.db_query({'submitted' : tic})
211 same = self.client.db_query({'submitted' : tic})
212 for s in same:
212 for s in same:
213 self.assertTrue(s['submitted'] == tic)
213 self.assertTrue(s['submitted'] == tic)
214
214
215 def test_db_query_keys(self):
215 def test_db_query_keys(self):
216 """test extracting subset of record keys"""
216 """test extracting subset of record keys"""
217 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
217 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
218 for rec in found:
218 for rec in found:
219 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
219 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
220
220
221 def test_db_query_default_keys(self):
221 def test_db_query_default_keys(self):
222 """default db_query excludes buffers"""
222 """default db_query excludes buffers"""
223 found = self.client.db_query({'msg_id': {'$ne' : ''}})
223 found = self.client.db_query({'msg_id': {'$ne' : ''}})
224 for rec in found:
224 for rec in found:
225 keys = set(rec.keys())
225 keys = set(rec.keys())
226 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
226 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
227 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
227 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
228
228
229 def test_db_query_msg_id(self):
229 def test_db_query_msg_id(self):
230 """ensure msg_id is always in db queries"""
230 """ensure msg_id is always in db queries"""
231 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
231 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
232 for rec in found:
232 for rec in found:
233 self.assertTrue('msg_id' in rec.keys())
233 self.assertTrue('msg_id' in rec.keys())
234 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
234 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
235 for rec in found:
235 for rec in found:
236 self.assertTrue('msg_id' in rec.keys())
236 self.assertTrue('msg_id' in rec.keys())
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
238 for rec in found:
238 for rec in found:
239 self.assertTrue('msg_id' in rec.keys())
239 self.assertTrue('msg_id' in rec.keys())
240
240
241 def test_db_query_get_result(self):
241 def test_db_query_get_result(self):
242 """pop in db_query shouldn't pop from result itself"""
242 """pop in db_query shouldn't pop from result itself"""
243 self.client[:].apply_sync(lambda : 1)
243 self.client[:].apply_sync(lambda : 1)
244 found = self.client.db_query({'msg_id': {'$ne' : ''}})
244 found = self.client.db_query({'msg_id': {'$ne' : ''}})
245 rc2 = clientmod.Client(profile='iptest')
245 rc2 = clientmod.Client(profile='iptest')
246 # If this bug is not fixed, this call will hang:
246 # If this bug is not fixed, this call will hang:
247 ar = rc2.get_result(self.client.history[-1])
247 ar = rc2.get_result(self.client.history[-1])
248 ar.wait(2)
248 ar.wait(2)
249 self.assertTrue(ar.ready())
249 self.assertTrue(ar.ready())
250 ar.get()
250 ar.get()
251 rc2.close()
251 rc2.close()
252
252
253 def test_db_query_in(self):
253 def test_db_query_in(self):
254 """test db query with '$in','$nin' operators"""
254 """test db query with '$in','$nin' operators"""
255 hist = self.client.hub_history()
255 hist = self.client.hub_history()
256 even = hist[::2]
256 even = hist[::2]
257 odd = hist[1::2]
257 odd = hist[1::2]
258 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
258 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
259 found = [ r['msg_id'] for r in recs ]
259 found = [ r['msg_id'] for r in recs ]
260 self.assertEquals(set(even), set(found))
260 self.assertEquals(set(even), set(found))
261 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
261 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
262 found = [ r['msg_id'] for r in recs ]
262 found = [ r['msg_id'] for r in recs ]
263 self.assertEquals(set(odd), set(found))
263 self.assertEquals(set(odd), set(found))
264
264
265 def test_hub_history(self):
265 def test_hub_history(self):
266 hist = self.client.hub_history()
266 hist = self.client.hub_history()
267 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
267 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
268 recdict = {}
268 recdict = {}
269 for rec in recs:
269 for rec in recs:
270 recdict[rec['msg_id']] = rec
270 recdict[rec['msg_id']] = rec
271
271
272 latest = datetime(1984,1,1)
272 latest = datetime(1984,1,1)
273 for msg_id in hist:
273 for msg_id in hist:
274 rec = recdict[msg_id]
274 rec = recdict[msg_id]
275 newt = rec['submitted']
275 newt = rec['submitted']
276 self.assertTrue(newt >= latest)
276 self.assertTrue(newt >= latest)
277 latest = newt
277 latest = newt
278 ar = self.client[-1].apply_async(lambda : 1)
278 ar = self.client[-1].apply_async(lambda : 1)
279 ar.get()
279 ar.get()
280 time.sleep(0.25)
280 time.sleep(0.25)
281 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
281 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
282
282
283 def _wait_for_idle(self):
283 def _wait_for_idle(self):
284 """wait for an engine to become idle, according to the Hub"""
284 """wait for an engine to become idle, according to the Hub"""
285 rc = self.client
285 rc = self.client
286
286
287 # timeout 2s, polling every 100ms
287 # timeout 2s, polling every 100ms
288 for i in range(20):
288 for i in range(20):
289 qs = rc.queue_status()
289 qs = rc.queue_status()
290 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
290 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
291 time.sleep(0.1)
291 time.sleep(0.1)
292 else:
292 else:
293 break
293 break
294
294
295 # ensure Hub up to date:
295 # ensure Hub up to date:
296 qs = rc.queue_status()
296 qs = rc.queue_status()
297 self.assertEquals(qs['unassigned'], 0)
297 self.assertEquals(qs['unassigned'], 0)
298 for eid in rc.ids:
298 for eid in rc.ids:
299 self.assertEquals(qs[eid]['tasks'], 0)
299 self.assertEquals(qs[eid]['tasks'], 0)
300
300
301
301
302 def test_resubmit(self):
302 def test_resubmit(self):
303 def f():
303 def f():
304 import random
304 import random
305 return random.random()
305 return random.random()
306 v = self.client.load_balanced_view()
306 v = self.client.load_balanced_view()
307 ar = v.apply_async(f)
307 ar = v.apply_async(f)
308 r1 = ar.get(1)
308 r1 = ar.get(1)
309 # give the Hub a chance to notice:
309 # give the Hub a chance to notice:
310 self._wait_for_idle()
310 self._wait_for_idle()
311 ahr = self.client.resubmit(ar.msg_ids)
311 ahr = self.client.resubmit(ar.msg_ids)
312 r2 = ahr.get(1)
312 r2 = ahr.get(1)
313 self.assertFalse(r1 == r2)
313 self.assertFalse(r1 == r2)
314
314
315 def test_resubmit_aborted(self):
315 def test_resubmit_aborted(self):
316 def f():
316 def f():
317 import random
317 import random
318 return random.random()
318 return random.random()
319 v = self.client.load_balanced_view()
319 v = self.client.load_balanced_view()
320 # restrict to one engine, so we can put a sleep
320 # restrict to one engine, so we can put a sleep
321 # ahead of the task, so it will get aborted
321 # ahead of the task, so it will get aborted
322 eid = self.client.ids[-1]
322 eid = self.client.ids[-1]
323 v.targets = [eid]
323 v.targets = [eid]
324 sleep = v.apply_async(time.sleep, 0.5)
324 sleep = v.apply_async(time.sleep, 0.5)
325 ar = v.apply_async(f)
325 ar = v.apply_async(f)
326 ar.abort()
326 ar.abort()
327 self.assertRaises(error.TaskAborted, ar.get)
327 self.assertRaises(error.TaskAborted, ar.get)
328 # Give the Hub a chance to get up to date:
328 # Give the Hub a chance to get up to date:
329 self._wait_for_idle()
329 self._wait_for_idle()
330 ahr = self.client.resubmit(ar.msg_ids)
330 ahr = self.client.resubmit(ar.msg_ids)
331 r2 = ahr.get(1)
331 r2 = ahr.get(1)
332
332
333 def test_resubmit_inflight(self):
333 def test_resubmit_inflight(self):
334 """ensure ValueError on resubmit of inflight task"""
334 """resubmit of inflight task"""
335 v = self.client.load_balanced_view()
335 v = self.client.load_balanced_view()
336 ar = v.apply_async(time.sleep,1)
336 ar = v.apply_async(time.sleep,1)
337 # give the message a chance to arrive
337 # give the message a chance to arrive
338 time.sleep(0.2)
338 time.sleep(0.2)
339 self.assertRaisesRemote(ValueError, self.client.resubmit, ar.msg_ids)
339 ahr = self.client.resubmit(ar.msg_ids)
340 ar.get(2)
340 ar.get(2)
341 ahr.get(2)
341
342
342 def test_resubmit_badkey(self):
343 def test_resubmit_badkey(self):
343 """ensure KeyError on resubmit of nonexistant task"""
344 """ensure KeyError on resubmit of nonexistant task"""
344 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
345 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
345
346
346 def test_purge_results(self):
347 def test_purge_results(self):
347 # ensure there are some tasks
348 # ensure there are some tasks
348 for i in range(5):
349 for i in range(5):
349 self.client[:].apply_sync(lambda : 1)
350 self.client[:].apply_sync(lambda : 1)
350 # Wait for the Hub to realise the result is done:
351 # Wait for the Hub to realise the result is done:
351 # This prevents a race condition, where we
352 # This prevents a race condition, where we
352 # might purge a result the Hub still thinks is pending.
353 # might purge a result the Hub still thinks is pending.
353 time.sleep(0.1)
354 time.sleep(0.1)
354 rc2 = clientmod.Client(profile='iptest')
355 rc2 = clientmod.Client(profile='iptest')
355 hist = self.client.hub_history()
356 hist = self.client.hub_history()
356 ahr = rc2.get_result([hist[-1]])
357 ahr = rc2.get_result([hist[-1]])
357 ahr.wait(10)
358 ahr.wait(10)
358 self.client.purge_results(hist[-1])
359 self.client.purge_results(hist[-1])
359 newhist = self.client.hub_history()
360 newhist = self.client.hub_history()
360 self.assertEquals(len(newhist)+1,len(hist))
361 self.assertEquals(len(newhist)+1,len(hist))
361 rc2.spin()
362 rc2.spin()
362 rc2.close()
363 rc2.close()
363
364
364 def test_purge_all_results(self):
365 def test_purge_all_results(self):
365 self.client.purge_results('all')
366 self.client.purge_results('all')
366 hist = self.client.hub_history()
367 hist = self.client.hub_history()
367 self.assertEquals(len(hist), 0)
368 self.assertEquals(len(hist), 0)
368
369
369 def test_spin_thread(self):
370 def test_spin_thread(self):
370 self.client.spin_thread(0.01)
371 self.client.spin_thread(0.01)
371 ar = self.client[-1].apply_async(lambda : 1)
372 ar = self.client[-1].apply_async(lambda : 1)
372 time.sleep(0.1)
373 time.sleep(0.1)
373 self.assertTrue(ar.wall_time < 0.1,
374 self.assertTrue(ar.wall_time < 0.1,
374 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
375 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
375 )
376 )
376
377
377 def test_stop_spin_thread(self):
378 def test_stop_spin_thread(self):
378 self.client.spin_thread(0.01)
379 self.client.spin_thread(0.01)
379 self.client.stop_spin_thread()
380 self.client.stop_spin_thread()
380 ar = self.client[-1].apply_async(lambda : 1)
381 ar = self.client[-1].apply_async(lambda : 1)
381 time.sleep(0.15)
382 time.sleep(0.15)
382 self.assertTrue(ar.wall_time > 0.1,
383 self.assertTrue(ar.wall_time > 0.1,
383 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
384 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
384 )
385 )
385
386
386
387
@@ -1,137 +1,137 b''
1 .. _parallel_db:
1 .. _parallel_db:
2
2
3 =======================
3 =======================
4 IPython's Task Database
4 IPython's Task Database
5 =======================
5 =======================
6
6
7 The IPython Hub stores all task requests and results in a database. Currently supported backends
7 The IPython Hub stores all task requests and results in a database. Currently supported backends
8 are: MongoDB, SQLite (the default), and an in-memory DictDB. The most common use case for
8 are: MongoDB, SQLite (the default), and an in-memory DictDB. The most common use case for
9 this is clients requesting results for tasks they did not submit, via:
9 this is clients requesting results for tasks they did not submit, via:
10
10
11 .. sourcecode:: ipython
11 .. sourcecode:: ipython
12
12
13 In [1]: rc.get_result(task_id)
13 In [1]: rc.get_result(task_id)
14
14
15 However, since we have this DB backend, we provide a direct query method in the :class:`client`
15 However, since we have this DB backend, we provide a direct query method in the :class:`client`
16 for users who want deeper introspection into their task history. The :meth:`db_query` method of
16 for users who want deeper introspection into their task history. The :meth:`db_query` method of
17 the Client is modeled after MongoDB queries, so if you have used MongoDB it should look
17 the Client is modeled after MongoDB queries, so if you have used MongoDB it should look
18 familiar. In fact, when the MongoDB backend is in use, the query is relayed directly. However,
18 familiar. In fact, when the MongoDB backend is in use, the query is relayed directly. However,
19 when using other backends, the interface is emulated and only a subset of queries is possible.
19 when using other backends, the interface is emulated and only a subset of queries is possible.
20
20
21 .. seealso::
21 .. seealso::
22
22
23 MongoDB query docs: http://www.mongodb.org/display/DOCS/Querying
23 MongoDB query docs: http://www.mongodb.org/display/DOCS/Querying
24
24
25 :meth:`Client.db_query` takes a dictionary query object, with keys from the TaskRecord key list,
25 :meth:`Client.db_query` takes a dictionary query object, with keys from the TaskRecord key list,
26 and values of either exact values to test, or MongoDB queries, which are dicts of The form:
26 and values of either exact values to test, or MongoDB queries, which are dicts of The form:
27 ``{'operator' : 'argument(s)'}``. There is also an optional `keys` argument, that specifies
27 ``{'operator' : 'argument(s)'}``. There is also an optional `keys` argument, that specifies
28 which subset of keys should be retrieved. The default is to retrieve all keys excluding the
28 which subset of keys should be retrieved. The default is to retrieve all keys excluding the
29 request and result buffers. :meth:`db_query` returns a list of TaskRecord dicts. Also like
29 request and result buffers. :meth:`db_query` returns a list of TaskRecord dicts. Also like
30 MongoDB, the `msg_id` key will always be included, whether requested or not.
30 MongoDB, the `msg_id` key will always be included, whether requested or not.
31
31
32 TaskRecord keys:
32 TaskRecord keys:
33
33
34 =============== =============== =============
34 =============== =============== =============
35 Key Type Description
35 Key Type Description
36 =============== =============== =============
36 =============== =============== =============
37 msg_id uuid(bytes) The msg ID
37 msg_id uuid(ascii) The msg ID
38 header dict The request header
38 header dict The request header
39 content dict The request content (likely empty)
39 content dict The request content (likely empty)
40 buffers list(bytes) buffers containing serialized request objects
40 buffers list(bytes) buffers containing serialized request objects
41 submitted datetime timestamp for time of submission (set by client)
41 submitted datetime timestamp for time of submission (set by client)
42 client_uuid uuid(bytes) IDENT of client's socket
42 client_uuid uuid(bytes) IDENT of client's socket
43 engine_uuid uuid(bytes) IDENT of engine's socket
43 engine_uuid uuid(bytes) IDENT of engine's socket
44 started datetime time task began execution on engine
44 started datetime time task began execution on engine
45 completed datetime time task finished execution (success or failure) on engine
45 completed datetime time task finished execution (success or failure) on engine
46 resubmitted datetime time of resubmission (if applicable)
46 resubmitted uuid(ascii) msg_id of resubmitted task (if applicable)
47 result_header dict header for result
47 result_header dict header for result
48 result_content dict content for result
48 result_content dict content for result
49 result_buffers list(bytes) buffers containing serialized request objects
49 result_buffers list(bytes) buffers containing serialized request objects
50 queue bytes The name of the queue for the task ('mux' or 'task')
50 queue bytes The name of the queue for the task ('mux' or 'task')
51 pyin <unused> Python input (unused)
51 pyin <unused> Python input (unused)
52 pyout <unused> Python output (unused)
52 pyout <unused> Python output (unused)
53 pyerr <unused> Python traceback (unused)
53 pyerr <unused> Python traceback (unused)
54 stdout str Stream of stdout data
54 stdout str Stream of stdout data
55 stderr str Stream of stderr data
55 stderr str Stream of stderr data
56
56
57 =============== =============== =============
57 =============== =============== =============
58
58
59 MongoDB operators we emulate on all backends:
59 MongoDB operators we emulate on all backends:
60
60
61 ========== =================
61 ========== =================
62 Operator Python equivalent
62 Operator Python equivalent
63 ========== =================
63 ========== =================
64 '$in' in
64 '$in' in
65 '$nin' not in
65 '$nin' not in
66 '$eq' ==
66 '$eq' ==
67 '$ne' !=
67 '$ne' !=
68 '$ge' >
68 '$ge' >
69 '$gte' >=
69 '$gte' >=
70 '$le' <
70 '$le' <
71 '$lte' <=
71 '$lte' <=
72 ========== =================
72 ========== =================
73
73
74
74
75 The DB Query is useful for two primary cases:
75 The DB Query is useful for two primary cases:
76
76
77 1. deep polling of task status or metadata
77 1. deep polling of task status or metadata
78 2. selecting a subset of tasks, on which to perform a later operation (e.g. wait on result, purge records, resubmit,...)
78 2. selecting a subset of tasks, on which to perform a later operation (e.g. wait on result, purge records, resubmit,...)
79
79
80 Example Queries
80 Example Queries
81 ===============
81 ===============
82
82
83
83
84 To get all msg_ids that are not completed, only retrieving their ID and start time:
84 To get all msg_ids that are not completed, only retrieving their ID and start time:
85
85
86 .. sourcecode:: ipython
86 .. sourcecode:: ipython
87
87
88 In [1]: incomplete = rc.db_query({'complete' : None}, keys=['msg_id', 'started'])
88 In [1]: incomplete = rc.db_query({'complete' : None}, keys=['msg_id', 'started'])
89
89
90 All jobs started in the last hour by me:
90 All jobs started in the last hour by me:
91
91
92 .. sourcecode:: ipython
92 .. sourcecode:: ipython
93
93
94 In [1]: from datetime import datetime, timedelta
94 In [1]: from datetime import datetime, timedelta
95
95
96 In [2]: hourago = datetime.now() - timedelta(1./24)
96 In [2]: hourago = datetime.now() - timedelta(1./24)
97
97
98 In [3]: recent = rc.db_query({'started' : {'$gte' : hourago },
98 In [3]: recent = rc.db_query({'started' : {'$gte' : hourago },
99 'client_uuid' : rc.session.session})
99 'client_uuid' : rc.session.session})
100
100
101 All jobs started more than an hour ago, by clients *other than me*:
101 All jobs started more than an hour ago, by clients *other than me*:
102
102
103 .. sourcecode:: ipython
103 .. sourcecode:: ipython
104
104
105 In [3]: recent = rc.db_query({'started' : {'$le' : hourago },
105 In [3]: recent = rc.db_query({'started' : {'$le' : hourago },
106 'client_uuid' : {'$ne' : rc.session.session}})
106 'client_uuid' : {'$ne' : rc.session.session}})
107
107
108 Result headers for all jobs on engine 3 or 4:
108 Result headers for all jobs on engine 3 or 4:
109
109
110 .. sourcecode:: ipython
110 .. sourcecode:: ipython
111
111
112 In [1]: uuids = map(rc._engines.get, (3,4))
112 In [1]: uuids = map(rc._engines.get, (3,4))
113
113
114 In [2]: hist34 = rc.db_query({'engine_uuid' : {'$in' : uuids }, keys='result_header')
114 In [2]: hist34 = rc.db_query({'engine_uuid' : {'$in' : uuids }, keys='result_header')
115
115
116
116
117 Cost
117 Cost
118 ====
118 ====
119
119
120 The advantage of the database backends is, of course, that large amounts of
120 The advantage of the database backends is, of course, that large amounts of
121 data can be stored that won't fit in memory. The default 'backend' is actually
121 data can be stored that won't fit in memory. The default 'backend' is actually
122 to just store all of this information in a Python dictionary. This is very fast,
122 to just store all of this information in a Python dictionary. This is very fast,
123 but will run out of memory quickly if you move a lot of data around, or your
123 but will run out of memory quickly if you move a lot of data around, or your
124 cluster is to run for a long time.
124 cluster is to run for a long time.
125
125
126 Unfortunately, the DB backends (SQLite and MongoDB) right now are rather slow,
126 Unfortunately, the DB backends (SQLite and MongoDB) right now are rather slow,
127 and can still consume large amounts of resources, particularly if large tasks
127 and can still consume large amounts of resources, particularly if large tasks
128 or results are being created at a high frequency.
128 or results are being created at a high frequency.
129
129
130 For this reason, we have added :class:`~.NoDB`,a dummy backend that doesn't
130 For this reason, we have added :class:`~.NoDB`,a dummy backend that doesn't
131 actually store any information. When you use this database, nothing is stored,
131 actually store any information. When you use this database, nothing is stored,
132 and any request for results will result in a KeyError. This obviously prevents
132 and any request for results will result in a KeyError. This obviously prevents
133 later requests for results and task resubmission from functioning, but
133 later requests for results and task resubmission from functioning, but
134 sometimes those nice features are not as useful as keeping Hub memory under
134 sometimes those nice features are not as useful as keeping Hub memory under
135 control.
135 control.
136
136
137
137
General Comments 0
You need to be logged in to leave comments. Login now