##// END OF EJS Templates
fix & test HubResults from execute requests
MinRK -
Show More
@@ -1,1717 +1,1721 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
36
36
37 from IPython.utils.coloransi import TermColors
37 from IPython.utils.coloransi import TermColors
38 from IPython.utils.jsonutil import rekey
38 from IPython.utils.jsonutil import rekey
39 from IPython.utils.localinterfaces import LOCAL_IPS
39 from IPython.utils.localinterfaces import LOCAL_IPS
40 from IPython.utils.path import get_ipython_dir
40 from IPython.utils.path import get_ipython_dir
41 from IPython.utils.py3compat import cast_bytes
41 from IPython.utils.py3compat import cast_bytes
42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
43 Dict, List, Bool, Set, Any)
43 Dict, List, Bool, Set, Any)
44 from IPython.external.decorator import decorator
44 from IPython.external.decorator import decorator
45 from IPython.external.ssh import tunnel
45 from IPython.external.ssh import tunnel
46
46
47 from IPython.parallel import Reference
47 from IPython.parallel import Reference
48 from IPython.parallel import error
48 from IPython.parallel import error
49 from IPython.parallel import util
49 from IPython.parallel import util
50
50
51 from IPython.zmq.session import Session, Message
51 from IPython.zmq.session import Session, Message
52
52
53 from .asyncresult import AsyncResult, AsyncHubResult
53 from .asyncresult import AsyncResult, AsyncHubResult
54 from .view import DirectView, LoadBalancedView
54 from .view import DirectView, LoadBalancedView
55
55
56 if sys.version_info[0] >= 3:
56 if sys.version_info[0] >= 3:
57 # xrange is used in a couple 'isinstance' tests in py2
57 # xrange is used in a couple 'isinstance' tests in py2
58 # should be just 'range' in 3k
58 # should be just 'range' in 3k
59 xrange = range
59 xrange = range
60
60
61 #--------------------------------------------------------------------------
61 #--------------------------------------------------------------------------
62 # Decorators for Client methods
62 # Decorators for Client methods
63 #--------------------------------------------------------------------------
63 #--------------------------------------------------------------------------
64
64
65 @decorator
65 @decorator
66 def spin_first(f, self, *args, **kwargs):
66 def spin_first(f, self, *args, **kwargs):
67 """Call spin() to sync state prior to calling the method."""
67 """Call spin() to sync state prior to calling the method."""
68 self.spin()
68 self.spin()
69 return f(self, *args, **kwargs)
69 return f(self, *args, **kwargs)
70
70
71
71
72 #--------------------------------------------------------------------------
72 #--------------------------------------------------------------------------
73 # Classes
73 # Classes
74 #--------------------------------------------------------------------------
74 #--------------------------------------------------------------------------
75
75
76
76
77 class ExecuteReply(object):
77 class ExecuteReply(object):
78 """wrapper for finished Execute results"""
78 """wrapper for finished Execute results"""
79 def __init__(self, msg_id, content, metadata):
79 def __init__(self, msg_id, content, metadata):
80 self.msg_id = msg_id
80 self.msg_id = msg_id
81 self._content = content
81 self._content = content
82 self.execution_count = content['execution_count']
82 self.execution_count = content['execution_count']
83 self.metadata = metadata
83 self.metadata = metadata
84
84
85 def __getitem__(self, key):
85 def __getitem__(self, key):
86 return self.metadata[key]
86 return self.metadata[key]
87
87
88 def __getattr__(self, key):
88 def __getattr__(self, key):
89 if key not in self.metadata:
89 if key not in self.metadata:
90 raise AttributeError(key)
90 raise AttributeError(key)
91 return self.metadata[key]
91 return self.metadata[key]
92
92
93 def __repr__(self):
93 def __repr__(self):
94 pyout = self.metadata['pyout'] or {'data':{}}
94 pyout = self.metadata['pyout'] or {'data':{}}
95 text_out = pyout['data'].get('text/plain', '')
95 text_out = pyout['data'].get('text/plain', '')
96 if len(text_out) > 32:
96 if len(text_out) > 32:
97 text_out = text_out[:29] + '...'
97 text_out = text_out[:29] + '...'
98
98
99 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
99 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
100
100
101 def _repr_pretty_(self, p, cycle):
101 def _repr_pretty_(self, p, cycle):
102 pyout = self.metadata['pyout'] or {'data':{}}
102 pyout = self.metadata['pyout'] or {'data':{}}
103 text_out = pyout['data'].get('text/plain', '')
103 text_out = pyout['data'].get('text/plain', '')
104
104
105 if not text_out:
105 if not text_out:
106 return
106 return
107
107
108 try:
108 try:
109 ip = get_ipython()
109 ip = get_ipython()
110 except NameError:
110 except NameError:
111 colors = "NoColor"
111 colors = "NoColor"
112 else:
112 else:
113 colors = ip.colors
113 colors = ip.colors
114
114
115 if colors == "NoColor":
115 if colors == "NoColor":
116 out = normal = ""
116 out = normal = ""
117 else:
117 else:
118 out = TermColors.Red
118 out = TermColors.Red
119 normal = TermColors.Normal
119 normal = TermColors.Normal
120
120
121 if '\n' in text_out and not text_out.startswith('\n'):
121 if '\n' in text_out and not text_out.startswith('\n'):
122 # add newline for multiline reprs
122 # add newline for multiline reprs
123 text_out = '\n' + text_out
123 text_out = '\n' + text_out
124
124
125 p.text(
125 p.text(
126 out + u'Out[%i:%i]: ' % (
126 out + u'Out[%i:%i]: ' % (
127 self.metadata['engine_id'], self.execution_count
127 self.metadata['engine_id'], self.execution_count
128 ) + normal + text_out
128 ) + normal + text_out
129 )
129 )
130
130
131 def _repr_html_(self):
131 def _repr_html_(self):
132 pyout = self.metadata['pyout'] or {'data':{}}
132 pyout = self.metadata['pyout'] or {'data':{}}
133 return pyout['data'].get("text/html")
133 return pyout['data'].get("text/html")
134
134
135 def _repr_latex_(self):
135 def _repr_latex_(self):
136 pyout = self.metadata['pyout'] or {'data':{}}
136 pyout = self.metadata['pyout'] or {'data':{}}
137 return pyout['data'].get("text/latex")
137 return pyout['data'].get("text/latex")
138
138
139 def _repr_json_(self):
139 def _repr_json_(self):
140 pyout = self.metadata['pyout'] or {'data':{}}
140 pyout = self.metadata['pyout'] or {'data':{}}
141 return pyout['data'].get("application/json")
141 return pyout['data'].get("application/json")
142
142
143 def _repr_javascript_(self):
143 def _repr_javascript_(self):
144 pyout = self.metadata['pyout'] or {'data':{}}
144 pyout = self.metadata['pyout'] or {'data':{}}
145 return pyout['data'].get("application/javascript")
145 return pyout['data'].get("application/javascript")
146
146
147 def _repr_png_(self):
147 def _repr_png_(self):
148 pyout = self.metadata['pyout'] or {'data':{}}
148 pyout = self.metadata['pyout'] or {'data':{}}
149 return pyout['data'].get("image/png")
149 return pyout['data'].get("image/png")
150
150
151 def _repr_jpeg_(self):
151 def _repr_jpeg_(self):
152 pyout = self.metadata['pyout'] or {'data':{}}
152 pyout = self.metadata['pyout'] or {'data':{}}
153 return pyout['data'].get("image/jpeg")
153 return pyout['data'].get("image/jpeg")
154
154
155 def _repr_svg_(self):
155 def _repr_svg_(self):
156 pyout = self.metadata['pyout'] or {'data':{}}
156 pyout = self.metadata['pyout'] or {'data':{}}
157 return pyout['data'].get("image/svg+xml")
157 return pyout['data'].get("image/svg+xml")
158
158
159
159
160 class Metadata(dict):
160 class Metadata(dict):
161 """Subclass of dict for initializing metadata values.
161 """Subclass of dict for initializing metadata values.
162
162
163 Attribute access works on keys.
163 Attribute access works on keys.
164
164
165 These objects have a strict set of keys - errors will raise if you try
165 These objects have a strict set of keys - errors will raise if you try
166 to add new keys.
166 to add new keys.
167 """
167 """
168 def __init__(self, *args, **kwargs):
168 def __init__(self, *args, **kwargs):
169 dict.__init__(self)
169 dict.__init__(self)
170 md = {'msg_id' : None,
170 md = {'msg_id' : None,
171 'submitted' : None,
171 'submitted' : None,
172 'started' : None,
172 'started' : None,
173 'completed' : None,
173 'completed' : None,
174 'received' : None,
174 'received' : None,
175 'engine_uuid' : None,
175 'engine_uuid' : None,
176 'engine_id' : None,
176 'engine_id' : None,
177 'follow' : None,
177 'follow' : None,
178 'after' : None,
178 'after' : None,
179 'status' : None,
179 'status' : None,
180
180
181 'pyin' : None,
181 'pyin' : None,
182 'pyout' : None,
182 'pyout' : None,
183 'pyerr' : None,
183 'pyerr' : None,
184 'stdout' : '',
184 'stdout' : '',
185 'stderr' : '',
185 'stderr' : '',
186 'outputs' : [],
186 'outputs' : [],
187 'outputs_ready' : False,
187 'outputs_ready' : False,
188 }
188 }
189 self.update(md)
189 self.update(md)
190 self.update(dict(*args, **kwargs))
190 self.update(dict(*args, **kwargs))
191
191
192 def __getattr__(self, key):
192 def __getattr__(self, key):
193 """getattr aliased to getitem"""
193 """getattr aliased to getitem"""
194 if key in self.iterkeys():
194 if key in self.iterkeys():
195 return self[key]
195 return self[key]
196 else:
196 else:
197 raise AttributeError(key)
197 raise AttributeError(key)
198
198
199 def __setattr__(self, key, value):
199 def __setattr__(self, key, value):
200 """setattr aliased to setitem, with strict"""
200 """setattr aliased to setitem, with strict"""
201 if key in self.iterkeys():
201 if key in self.iterkeys():
202 self[key] = value
202 self[key] = value
203 else:
203 else:
204 raise AttributeError(key)
204 raise AttributeError(key)
205
205
206 def __setitem__(self, key, value):
206 def __setitem__(self, key, value):
207 """strict static key enforcement"""
207 """strict static key enforcement"""
208 if key in self.iterkeys():
208 if key in self.iterkeys():
209 dict.__setitem__(self, key, value)
209 dict.__setitem__(self, key, value)
210 else:
210 else:
211 raise KeyError(key)
211 raise KeyError(key)
212
212
213
213
214 class Client(HasTraits):
214 class Client(HasTraits):
215 """A semi-synchronous client to the IPython ZMQ cluster
215 """A semi-synchronous client to the IPython ZMQ cluster
216
216
217 Parameters
217 Parameters
218 ----------
218 ----------
219
219
220 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
220 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
221 Connection information for the Hub's registration. If a json connector
221 Connection information for the Hub's registration. If a json connector
222 file is given, then likely no further configuration is necessary.
222 file is given, then likely no further configuration is necessary.
223 [Default: use profile]
223 [Default: use profile]
224 profile : bytes
224 profile : bytes
225 The name of the Cluster profile to be used to find connector information.
225 The name of the Cluster profile to be used to find connector information.
226 If run from an IPython application, the default profile will be the same
226 If run from an IPython application, the default profile will be the same
227 as the running application, otherwise it will be 'default'.
227 as the running application, otherwise it will be 'default'.
228 context : zmq.Context
228 context : zmq.Context
229 Pass an existing zmq.Context instance, otherwise the client will create its own.
229 Pass an existing zmq.Context instance, otherwise the client will create its own.
230 debug : bool
230 debug : bool
231 flag for lots of message printing for debug purposes
231 flag for lots of message printing for debug purposes
232 timeout : int/float
232 timeout : int/float
233 time (in seconds) to wait for connection replies from the Hub
233 time (in seconds) to wait for connection replies from the Hub
234 [Default: 10]
234 [Default: 10]
235
235
236 #-------------- session related args ----------------
236 #-------------- session related args ----------------
237
237
238 config : Config object
238 config : Config object
239 If specified, this will be relayed to the Session for configuration
239 If specified, this will be relayed to the Session for configuration
240 username : str
240 username : str
241 set username for the session object
241 set username for the session object
242 packer : str (import_string) or callable
242 packer : str (import_string) or callable
243 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
243 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
244 function to serialize messages. Must support same input as
244 function to serialize messages. Must support same input as
245 JSON, and output must be bytes.
245 JSON, and output must be bytes.
246 You can pass a callable directly as `pack`
246 You can pass a callable directly as `pack`
247 unpacker : str (import_string) or callable
247 unpacker : str (import_string) or callable
248 The inverse of packer. Only necessary if packer is specified as *not* one
248 The inverse of packer. Only necessary if packer is specified as *not* one
249 of 'json' or 'pickle'.
249 of 'json' or 'pickle'.
250
250
251 #-------------- ssh related args ----------------
251 #-------------- ssh related args ----------------
252 # These are args for configuring the ssh tunnel to be used
252 # These are args for configuring the ssh tunnel to be used
253 # credentials are used to forward connections over ssh to the Controller
253 # credentials are used to forward connections over ssh to the Controller
254 # Note that the ip given in `addr` needs to be relative to sshserver
254 # Note that the ip given in `addr` needs to be relative to sshserver
255 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
255 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
256 # and set sshserver as the same machine the Controller is on. However,
256 # and set sshserver as the same machine the Controller is on. However,
257 # the only requirement is that sshserver is able to see the Controller
257 # the only requirement is that sshserver is able to see the Controller
258 # (i.e. is within the same trusted network).
258 # (i.e. is within the same trusted network).
259
259
260 sshserver : str
260 sshserver : str
261 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
261 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
262 If keyfile or password is specified, and this is not, it will default to
262 If keyfile or password is specified, and this is not, it will default to
263 the ip given in addr.
263 the ip given in addr.
264 sshkey : str; path to ssh private key file
264 sshkey : str; path to ssh private key file
265 This specifies a key to be used in ssh login, default None.
265 This specifies a key to be used in ssh login, default None.
266 Regular default ssh keys will be used without specifying this argument.
266 Regular default ssh keys will be used without specifying this argument.
267 password : str
267 password : str
268 Your ssh password to sshserver. Note that if this is left None,
268 Your ssh password to sshserver. Note that if this is left None,
269 you will be prompted for it if passwordless key based login is unavailable.
269 you will be prompted for it if passwordless key based login is unavailable.
270 paramiko : bool
270 paramiko : bool
271 flag for whether to use paramiko instead of shell ssh for tunneling.
271 flag for whether to use paramiko instead of shell ssh for tunneling.
272 [default: True on win32, False else]
272 [default: True on win32, False else]
273
273
274 ------- exec authentication args -------
274 ------- exec authentication args -------
275 If even localhost is untrusted, you can have some protection against
275 If even localhost is untrusted, you can have some protection against
276 unauthorized execution by signing messages with HMAC digests.
276 unauthorized execution by signing messages with HMAC digests.
277 Messages are still sent as cleartext, so if someone can snoop your
277 Messages are still sent as cleartext, so if someone can snoop your
278 loopback traffic this will not protect your privacy, but will prevent
278 loopback traffic this will not protect your privacy, but will prevent
279 unauthorized execution.
279 unauthorized execution.
280
280
281 exec_key : str
281 exec_key : str
282 an authentication key or file containing a key
282 an authentication key or file containing a key
283 default: None
283 default: None
284
284
285
285
286 Attributes
286 Attributes
287 ----------
287 ----------
288
288
289 ids : list of int engine IDs
289 ids : list of int engine IDs
290 requesting the ids attribute always synchronizes
290 requesting the ids attribute always synchronizes
291 the registration state. To request ids without synchronization,
291 the registration state. To request ids without synchronization,
292 use semi-private _ids attributes.
292 use semi-private _ids attributes.
293
293
294 history : list of msg_ids
294 history : list of msg_ids
295 a list of msg_ids, keeping track of all the execution
295 a list of msg_ids, keeping track of all the execution
296 messages you have submitted in order.
296 messages you have submitted in order.
297
297
298 outstanding : set of msg_ids
298 outstanding : set of msg_ids
299 a set of msg_ids that have been submitted, but whose
299 a set of msg_ids that have been submitted, but whose
300 results have not yet been received.
300 results have not yet been received.
301
301
302 results : dict
302 results : dict
303 a dict of all our results, keyed by msg_id
303 a dict of all our results, keyed by msg_id
304
304
305 block : bool
305 block : bool
306 determines default behavior when block not specified
306 determines default behavior when block not specified
307 in execution methods
307 in execution methods
308
308
309 Methods
309 Methods
310 -------
310 -------
311
311
312 spin
312 spin
313 flushes incoming results and registration state changes
313 flushes incoming results and registration state changes
314 control methods spin, and requesting `ids` also ensures up to date
314 control methods spin, and requesting `ids` also ensures up to date
315
315
316 wait
316 wait
317 wait on one or more msg_ids
317 wait on one or more msg_ids
318
318
319 execution methods
319 execution methods
320 apply
320 apply
321 legacy: execute, run
321 legacy: execute, run
322
322
323 data movement
323 data movement
324 push, pull, scatter, gather
324 push, pull, scatter, gather
325
325
326 query methods
326 query methods
327 queue_status, get_result, purge, result_status
327 queue_status, get_result, purge, result_status
328
328
329 control methods
329 control methods
330 abort, shutdown
330 abort, shutdown
331
331
332 """
332 """
333
333
334
334
335 block = Bool(False)
335 block = Bool(False)
336 outstanding = Set()
336 outstanding = Set()
337 results = Instance('collections.defaultdict', (dict,))
337 results = Instance('collections.defaultdict', (dict,))
338 metadata = Instance('collections.defaultdict', (Metadata,))
338 metadata = Instance('collections.defaultdict', (Metadata,))
339 history = List()
339 history = List()
340 debug = Bool(False)
340 debug = Bool(False)
341 _spin_thread = Any()
341 _spin_thread = Any()
342 _stop_spinning = Any()
342 _stop_spinning = Any()
343
343
344 profile=Unicode()
344 profile=Unicode()
345 def _profile_default(self):
345 def _profile_default(self):
346 if BaseIPythonApplication.initialized():
346 if BaseIPythonApplication.initialized():
347 # an IPython app *might* be running, try to get its profile
347 # an IPython app *might* be running, try to get its profile
348 try:
348 try:
349 return BaseIPythonApplication.instance().profile
349 return BaseIPythonApplication.instance().profile
350 except (AttributeError, MultipleInstanceError):
350 except (AttributeError, MultipleInstanceError):
351 # could be a *different* subclass of config.Application,
351 # could be a *different* subclass of config.Application,
352 # which would raise one of these two errors.
352 # which would raise one of these two errors.
353 return u'default'
353 return u'default'
354 else:
354 else:
355 return u'default'
355 return u'default'
356
356
357
357
358 _outstanding_dict = Instance('collections.defaultdict', (set,))
358 _outstanding_dict = Instance('collections.defaultdict', (set,))
359 _ids = List()
359 _ids = List()
360 _connected=Bool(False)
360 _connected=Bool(False)
361 _ssh=Bool(False)
361 _ssh=Bool(False)
362 _context = Instance('zmq.Context')
362 _context = Instance('zmq.Context')
363 _config = Dict()
363 _config = Dict()
364 _engines=Instance(util.ReverseDict, (), {})
364 _engines=Instance(util.ReverseDict, (), {})
365 # _hub_socket=Instance('zmq.Socket')
365 # _hub_socket=Instance('zmq.Socket')
366 _query_socket=Instance('zmq.Socket')
366 _query_socket=Instance('zmq.Socket')
367 _control_socket=Instance('zmq.Socket')
367 _control_socket=Instance('zmq.Socket')
368 _iopub_socket=Instance('zmq.Socket')
368 _iopub_socket=Instance('zmq.Socket')
369 _notification_socket=Instance('zmq.Socket')
369 _notification_socket=Instance('zmq.Socket')
370 _mux_socket=Instance('zmq.Socket')
370 _mux_socket=Instance('zmq.Socket')
371 _task_socket=Instance('zmq.Socket')
371 _task_socket=Instance('zmq.Socket')
372 _task_scheme=Unicode()
372 _task_scheme=Unicode()
373 _closed = False
373 _closed = False
374 _ignored_control_replies=Integer(0)
374 _ignored_control_replies=Integer(0)
375 _ignored_hub_replies=Integer(0)
375 _ignored_hub_replies=Integer(0)
376
376
377 def __new__(self, *args, **kw):
377 def __new__(self, *args, **kw):
378 # don't raise on positional args
378 # don't raise on positional args
379 return HasTraits.__new__(self, **kw)
379 return HasTraits.__new__(self, **kw)
380
380
381 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
381 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
382 context=None, debug=False, exec_key=None,
382 context=None, debug=False, exec_key=None,
383 sshserver=None, sshkey=None, password=None, paramiko=None,
383 sshserver=None, sshkey=None, password=None, paramiko=None,
384 timeout=10, **extra_args
384 timeout=10, **extra_args
385 ):
385 ):
386 if profile:
386 if profile:
387 super(Client, self).__init__(debug=debug, profile=profile)
387 super(Client, self).__init__(debug=debug, profile=profile)
388 else:
388 else:
389 super(Client, self).__init__(debug=debug)
389 super(Client, self).__init__(debug=debug)
390 if context is None:
390 if context is None:
391 context = zmq.Context.instance()
391 context = zmq.Context.instance()
392 self._context = context
392 self._context = context
393 self._stop_spinning = Event()
393 self._stop_spinning = Event()
394
394
395 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
395 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
396 if self._cd is not None:
396 if self._cd is not None:
397 if url_or_file is None:
397 if url_or_file is None:
398 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
398 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
399 if url_or_file is None:
399 if url_or_file is None:
400 raise ValueError(
400 raise ValueError(
401 "I can't find enough information to connect to a hub!"
401 "I can't find enough information to connect to a hub!"
402 " Please specify at least one of url_or_file or profile."
402 " Please specify at least one of url_or_file or profile."
403 )
403 )
404
404
405 if not util.is_url(url_or_file):
405 if not util.is_url(url_or_file):
406 # it's not a url, try for a file
406 # it's not a url, try for a file
407 if not os.path.exists(url_or_file):
407 if not os.path.exists(url_or_file):
408 if self._cd:
408 if self._cd:
409 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
409 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
410 if not os.path.exists(url_or_file):
410 if not os.path.exists(url_or_file):
411 raise IOError("Connection file not found: %r" % url_or_file)
411 raise IOError("Connection file not found: %r" % url_or_file)
412 with open(url_or_file) as f:
412 with open(url_or_file) as f:
413 cfg = json.loads(f.read())
413 cfg = json.loads(f.read())
414 else:
414 else:
415 cfg = {'url':url_or_file}
415 cfg = {'url':url_or_file}
416
416
417 # sync defaults from args, json:
417 # sync defaults from args, json:
418 if sshserver:
418 if sshserver:
419 cfg['ssh'] = sshserver
419 cfg['ssh'] = sshserver
420 if exec_key:
420 if exec_key:
421 cfg['exec_key'] = exec_key
421 cfg['exec_key'] = exec_key
422 exec_key = cfg['exec_key']
422 exec_key = cfg['exec_key']
423 location = cfg.setdefault('location', None)
423 location = cfg.setdefault('location', None)
424 cfg['url'] = util.disambiguate_url(cfg['url'], location)
424 cfg['url'] = util.disambiguate_url(cfg['url'], location)
425 url = cfg['url']
425 url = cfg['url']
426 proto,addr,port = util.split_url(url)
426 proto,addr,port = util.split_url(url)
427 if location is not None and addr == '127.0.0.1':
427 if location is not None and addr == '127.0.0.1':
428 # location specified, and connection is expected to be local
428 # location specified, and connection is expected to be local
429 if location not in LOCAL_IPS and not sshserver:
429 if location not in LOCAL_IPS and not sshserver:
430 # load ssh from JSON *only* if the controller is not on
430 # load ssh from JSON *only* if the controller is not on
431 # this machine
431 # this machine
432 sshserver=cfg['ssh']
432 sshserver=cfg['ssh']
433 if location not in LOCAL_IPS and not sshserver:
433 if location not in LOCAL_IPS and not sshserver:
434 # warn if no ssh specified, but SSH is probably needed
434 # warn if no ssh specified, but SSH is probably needed
435 # This is only a warning, because the most likely cause
435 # This is only a warning, because the most likely cause
436 # is a local Controller on a laptop whose IP is dynamic
436 # is a local Controller on a laptop whose IP is dynamic
437 warnings.warn("""
437 warnings.warn("""
438 Controller appears to be listening on localhost, but not on this machine.
438 Controller appears to be listening on localhost, but not on this machine.
439 If this is true, you should specify Client(...,sshserver='you@%s')
439 If this is true, you should specify Client(...,sshserver='you@%s')
440 or instruct your controller to listen on an external IP."""%location,
440 or instruct your controller to listen on an external IP."""%location,
441 RuntimeWarning)
441 RuntimeWarning)
442 elif not sshserver:
442 elif not sshserver:
443 # otherwise sync with cfg
443 # otherwise sync with cfg
444 sshserver = cfg['ssh']
444 sshserver = cfg['ssh']
445
445
446 self._config = cfg
446 self._config = cfg
447
447
448 self._ssh = bool(sshserver or sshkey or password)
448 self._ssh = bool(sshserver or sshkey or password)
449 if self._ssh and sshserver is None:
449 if self._ssh and sshserver is None:
450 # default to ssh via localhost
450 # default to ssh via localhost
451 sshserver = url.split('://')[1].split(':')[0]
451 sshserver = url.split('://')[1].split(':')[0]
452 if self._ssh and password is None:
452 if self._ssh and password is None:
453 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
453 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
454 password=False
454 password=False
455 else:
455 else:
456 password = getpass("SSH Password for %s: "%sshserver)
456 password = getpass("SSH Password for %s: "%sshserver)
457 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
457 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
458
458
459 # configure and construct the session
459 # configure and construct the session
460 if exec_key is not None:
460 if exec_key is not None:
461 if os.path.isfile(exec_key):
461 if os.path.isfile(exec_key):
462 extra_args['keyfile'] = exec_key
462 extra_args['keyfile'] = exec_key
463 else:
463 else:
464 exec_key = cast_bytes(exec_key)
464 exec_key = cast_bytes(exec_key)
465 extra_args['key'] = exec_key
465 extra_args['key'] = exec_key
466 self.session = Session(**extra_args)
466 self.session = Session(**extra_args)
467
467
468 self._query_socket = self._context.socket(zmq.DEALER)
468 self._query_socket = self._context.socket(zmq.DEALER)
469 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
469 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
470 if self._ssh:
470 if self._ssh:
471 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
471 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
472 else:
472 else:
473 self._query_socket.connect(url)
473 self._query_socket.connect(url)
474
474
475 self.session.debug = self.debug
475 self.session.debug = self.debug
476
476
477 self._notification_handlers = {'registration_notification' : self._register_engine,
477 self._notification_handlers = {'registration_notification' : self._register_engine,
478 'unregistration_notification' : self._unregister_engine,
478 'unregistration_notification' : self._unregister_engine,
479 'shutdown_notification' : lambda msg: self.close(),
479 'shutdown_notification' : lambda msg: self.close(),
480 }
480 }
481 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
481 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
482 'apply_reply' : self._handle_apply_reply}
482 'apply_reply' : self._handle_apply_reply}
483 self._connect(sshserver, ssh_kwargs, timeout)
483 self._connect(sshserver, ssh_kwargs, timeout)
484
484
485 # last step: setup magics, if we are in IPython:
485 # last step: setup magics, if we are in IPython:
486
486
487 try:
487 try:
488 ip = get_ipython()
488 ip = get_ipython()
489 except NameError:
489 except NameError:
490 return
490 return
491 else:
491 else:
492 if 'px' not in ip.magics_manager.magics:
492 if 'px' not in ip.magics_manager.magics:
493 # in IPython but we are the first Client.
493 # in IPython but we are the first Client.
494 # activate a default view for parallel magics.
494 # activate a default view for parallel magics.
495 self.activate()
495 self.activate()
496
496
497 def __del__(self):
497 def __del__(self):
498 """cleanup sockets, but _not_ context."""
498 """cleanup sockets, but _not_ context."""
499 self.close()
499 self.close()
500
500
501 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
501 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
502 if ipython_dir is None:
502 if ipython_dir is None:
503 ipython_dir = get_ipython_dir()
503 ipython_dir = get_ipython_dir()
504 if profile_dir is not None:
504 if profile_dir is not None:
505 try:
505 try:
506 self._cd = ProfileDir.find_profile_dir(profile_dir)
506 self._cd = ProfileDir.find_profile_dir(profile_dir)
507 return
507 return
508 except ProfileDirError:
508 except ProfileDirError:
509 pass
509 pass
510 elif profile is not None:
510 elif profile is not None:
511 try:
511 try:
512 self._cd = ProfileDir.find_profile_dir_by_name(
512 self._cd = ProfileDir.find_profile_dir_by_name(
513 ipython_dir, profile)
513 ipython_dir, profile)
514 return
514 return
515 except ProfileDirError:
515 except ProfileDirError:
516 pass
516 pass
517 self._cd = None
517 self._cd = None
518
518
519 def _update_engines(self, engines):
519 def _update_engines(self, engines):
520 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
520 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
521 for k,v in engines.iteritems():
521 for k,v in engines.iteritems():
522 eid = int(k)
522 eid = int(k)
523 self._engines[eid] = v
523 self._engines[eid] = v
524 self._ids.append(eid)
524 self._ids.append(eid)
525 self._ids = sorted(self._ids)
525 self._ids = sorted(self._ids)
526 if sorted(self._engines.keys()) != range(len(self._engines)) and \
526 if sorted(self._engines.keys()) != range(len(self._engines)) and \
527 self._task_scheme == 'pure' and self._task_socket:
527 self._task_scheme == 'pure' and self._task_socket:
528 self._stop_scheduling_tasks()
528 self._stop_scheduling_tasks()
529
529
530 def _stop_scheduling_tasks(self):
530 def _stop_scheduling_tasks(self):
531 """Stop scheduling tasks because an engine has been unregistered
531 """Stop scheduling tasks because an engine has been unregistered
532 from a pure ZMQ scheduler.
532 from a pure ZMQ scheduler.
533 """
533 """
534 self._task_socket.close()
534 self._task_socket.close()
535 self._task_socket = None
535 self._task_socket = None
536 msg = "An engine has been unregistered, and we are using pure " +\
536 msg = "An engine has been unregistered, and we are using pure " +\
537 "ZMQ task scheduling. Task farming will be disabled."
537 "ZMQ task scheduling. Task farming will be disabled."
538 if self.outstanding:
538 if self.outstanding:
539 msg += " If you were running tasks when this happened, " +\
539 msg += " If you were running tasks when this happened, " +\
540 "some `outstanding` msg_ids may never resolve."
540 "some `outstanding` msg_ids may never resolve."
541 warnings.warn(msg, RuntimeWarning)
541 warnings.warn(msg, RuntimeWarning)
542
542
543 def _build_targets(self, targets):
543 def _build_targets(self, targets):
544 """Turn valid target IDs or 'all' into two lists:
544 """Turn valid target IDs or 'all' into two lists:
545 (int_ids, uuids).
545 (int_ids, uuids).
546 """
546 """
547 if not self._ids:
547 if not self._ids:
548 # flush notification socket if no engines yet, just in case
548 # flush notification socket if no engines yet, just in case
549 if not self.ids:
549 if not self.ids:
550 raise error.NoEnginesRegistered("Can't build targets without any engines")
550 raise error.NoEnginesRegistered("Can't build targets without any engines")
551
551
552 if targets is None:
552 if targets is None:
553 targets = self._ids
553 targets = self._ids
554 elif isinstance(targets, basestring):
554 elif isinstance(targets, basestring):
555 if targets.lower() == 'all':
555 if targets.lower() == 'all':
556 targets = self._ids
556 targets = self._ids
557 else:
557 else:
558 raise TypeError("%r not valid str target, must be 'all'"%(targets))
558 raise TypeError("%r not valid str target, must be 'all'"%(targets))
559 elif isinstance(targets, int):
559 elif isinstance(targets, int):
560 if targets < 0:
560 if targets < 0:
561 targets = self.ids[targets]
561 targets = self.ids[targets]
562 if targets not in self._ids:
562 if targets not in self._ids:
563 raise IndexError("No such engine: %i"%targets)
563 raise IndexError("No such engine: %i"%targets)
564 targets = [targets]
564 targets = [targets]
565
565
566 if isinstance(targets, slice):
566 if isinstance(targets, slice):
567 indices = range(len(self._ids))[targets]
567 indices = range(len(self._ids))[targets]
568 ids = self.ids
568 ids = self.ids
569 targets = [ ids[i] for i in indices ]
569 targets = [ ids[i] for i in indices ]
570
570
571 if not isinstance(targets, (tuple, list, xrange)):
571 if not isinstance(targets, (tuple, list, xrange)):
572 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
572 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
573
573
574 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
574 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
575
575
576 def _connect(self, sshserver, ssh_kwargs, timeout):
576 def _connect(self, sshserver, ssh_kwargs, timeout):
577 """setup all our socket connections to the cluster. This is called from
577 """setup all our socket connections to the cluster. This is called from
578 __init__."""
578 __init__."""
579
579
580 # Maybe allow reconnecting?
580 # Maybe allow reconnecting?
581 if self._connected:
581 if self._connected:
582 return
582 return
583 self._connected=True
583 self._connected=True
584
584
585 def connect_socket(s, url):
585 def connect_socket(s, url):
586 url = util.disambiguate_url(url, self._config['location'])
586 url = util.disambiguate_url(url, self._config['location'])
587 if self._ssh:
587 if self._ssh:
588 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
588 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
589 else:
589 else:
590 return s.connect(url)
590 return s.connect(url)
591
591
592 self.session.send(self._query_socket, 'connection_request')
592 self.session.send(self._query_socket, 'connection_request')
593 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
593 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
594 poller = zmq.Poller()
594 poller = zmq.Poller()
595 poller.register(self._query_socket, zmq.POLLIN)
595 poller.register(self._query_socket, zmq.POLLIN)
596 # poll expects milliseconds, timeout is seconds
596 # poll expects milliseconds, timeout is seconds
597 evts = poller.poll(timeout*1000)
597 evts = poller.poll(timeout*1000)
598 if not evts:
598 if not evts:
599 raise error.TimeoutError("Hub connection request timed out")
599 raise error.TimeoutError("Hub connection request timed out")
600 idents,msg = self.session.recv(self._query_socket,mode=0)
600 idents,msg = self.session.recv(self._query_socket,mode=0)
601 if self.debug:
601 if self.debug:
602 pprint(msg)
602 pprint(msg)
603 msg = Message(msg)
603 msg = Message(msg)
604 content = msg.content
604 content = msg.content
605 self._config['registration'] = dict(content)
605 self._config['registration'] = dict(content)
606 if content.status == 'ok':
606 if content.status == 'ok':
607 ident = self.session.bsession
607 ident = self.session.bsession
608 if content.mux:
608 if content.mux:
609 self._mux_socket = self._context.socket(zmq.DEALER)
609 self._mux_socket = self._context.socket(zmq.DEALER)
610 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
610 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
611 connect_socket(self._mux_socket, content.mux)
611 connect_socket(self._mux_socket, content.mux)
612 if content.task:
612 if content.task:
613 self._task_scheme, task_addr = content.task
613 self._task_scheme, task_addr = content.task
614 self._task_socket = self._context.socket(zmq.DEALER)
614 self._task_socket = self._context.socket(zmq.DEALER)
615 self._task_socket.setsockopt(zmq.IDENTITY, ident)
615 self._task_socket.setsockopt(zmq.IDENTITY, ident)
616 connect_socket(self._task_socket, task_addr)
616 connect_socket(self._task_socket, task_addr)
617 if content.notification:
617 if content.notification:
618 self._notification_socket = self._context.socket(zmq.SUB)
618 self._notification_socket = self._context.socket(zmq.SUB)
619 connect_socket(self._notification_socket, content.notification)
619 connect_socket(self._notification_socket, content.notification)
620 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
620 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
621 # if content.query:
621 # if content.query:
622 # self._query_socket = self._context.socket(zmq.DEALER)
622 # self._query_socket = self._context.socket(zmq.DEALER)
623 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
623 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
624 # connect_socket(self._query_socket, content.query)
624 # connect_socket(self._query_socket, content.query)
625 if content.control:
625 if content.control:
626 self._control_socket = self._context.socket(zmq.DEALER)
626 self._control_socket = self._context.socket(zmq.DEALER)
627 self._control_socket.setsockopt(zmq.IDENTITY, ident)
627 self._control_socket.setsockopt(zmq.IDENTITY, ident)
628 connect_socket(self._control_socket, content.control)
628 connect_socket(self._control_socket, content.control)
629 if content.iopub:
629 if content.iopub:
630 self._iopub_socket = self._context.socket(zmq.SUB)
630 self._iopub_socket = self._context.socket(zmq.SUB)
631 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
631 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
632 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
632 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
633 connect_socket(self._iopub_socket, content.iopub)
633 connect_socket(self._iopub_socket, content.iopub)
634 self._update_engines(dict(content.engines))
634 self._update_engines(dict(content.engines))
635 else:
635 else:
636 self._connected = False
636 self._connected = False
637 raise Exception("Failed to connect!")
637 raise Exception("Failed to connect!")
638
638
639 #--------------------------------------------------------------------------
639 #--------------------------------------------------------------------------
640 # handlers and callbacks for incoming messages
640 # handlers and callbacks for incoming messages
641 #--------------------------------------------------------------------------
641 #--------------------------------------------------------------------------
642
642
643 def _unwrap_exception(self, content):
643 def _unwrap_exception(self, content):
644 """unwrap exception, and remap engine_id to int."""
644 """unwrap exception, and remap engine_id to int."""
645 e = error.unwrap_exception(content)
645 e = error.unwrap_exception(content)
646 # print e.traceback
646 # print e.traceback
647 if e.engine_info:
647 if e.engine_info:
648 e_uuid = e.engine_info['engine_uuid']
648 e_uuid = e.engine_info['engine_uuid']
649 eid = self._engines[e_uuid]
649 eid = self._engines[e_uuid]
650 e.engine_info['engine_id'] = eid
650 e.engine_info['engine_id'] = eid
651 return e
651 return e
652
652
653 def _extract_metadata(self, header, parent, content):
653 def _extract_metadata(self, header, parent, content):
654 md = {'msg_id' : parent['msg_id'],
654 md = {'msg_id' : parent['msg_id'],
655 'received' : datetime.now(),
655 'received' : datetime.now(),
656 'engine_uuid' : header.get('engine', None),
656 'engine_uuid' : header.get('engine', None),
657 'follow' : parent.get('follow', []),
657 'follow' : parent.get('follow', []),
658 'after' : parent.get('after', []),
658 'after' : parent.get('after', []),
659 'status' : content['status'],
659 'status' : content['status'],
660 }
660 }
661
661
662 if md['engine_uuid'] is not None:
662 if md['engine_uuid'] is not None:
663 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
663 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
664
664
665 if 'date' in parent:
665 if 'date' in parent:
666 md['submitted'] = parent['date']
666 md['submitted'] = parent['date']
667 if 'started' in header:
667 if 'started' in header:
668 md['started'] = header['started']
668 md['started'] = header['started']
669 if 'date' in header:
669 if 'date' in header:
670 md['completed'] = header['date']
670 md['completed'] = header['date']
671 return md
671 return md
672
672
673 def _register_engine(self, msg):
673 def _register_engine(self, msg):
674 """Register a new engine, and update our connection info."""
674 """Register a new engine, and update our connection info."""
675 content = msg['content']
675 content = msg['content']
676 eid = content['id']
676 eid = content['id']
677 d = {eid : content['queue']}
677 d = {eid : content['queue']}
678 self._update_engines(d)
678 self._update_engines(d)
679
679
680 def _unregister_engine(self, msg):
680 def _unregister_engine(self, msg):
681 """Unregister an engine that has died."""
681 """Unregister an engine that has died."""
682 content = msg['content']
682 content = msg['content']
683 eid = int(content['id'])
683 eid = int(content['id'])
684 if eid in self._ids:
684 if eid in self._ids:
685 self._ids.remove(eid)
685 self._ids.remove(eid)
686 uuid = self._engines.pop(eid)
686 uuid = self._engines.pop(eid)
687
687
688 self._handle_stranded_msgs(eid, uuid)
688 self._handle_stranded_msgs(eid, uuid)
689
689
690 if self._task_socket and self._task_scheme == 'pure':
690 if self._task_socket and self._task_scheme == 'pure':
691 self._stop_scheduling_tasks()
691 self._stop_scheduling_tasks()
692
692
693 def _handle_stranded_msgs(self, eid, uuid):
693 def _handle_stranded_msgs(self, eid, uuid):
694 """Handle messages known to be on an engine when the engine unregisters.
694 """Handle messages known to be on an engine when the engine unregisters.
695
695
696 It is possible that this will fire prematurely - that is, an engine will
696 It is possible that this will fire prematurely - that is, an engine will
697 go down after completing a result, and the client will be notified
697 go down after completing a result, and the client will be notified
698 of the unregistration and later receive the successful result.
698 of the unregistration and later receive the successful result.
699 """
699 """
700
700
701 outstanding = self._outstanding_dict[uuid]
701 outstanding = self._outstanding_dict[uuid]
702
702
703 for msg_id in list(outstanding):
703 for msg_id in list(outstanding):
704 if msg_id in self.results:
704 if msg_id in self.results:
705 # we already
705 # we already
706 continue
706 continue
707 try:
707 try:
708 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
708 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
709 except:
709 except:
710 content = error.wrap_exception()
710 content = error.wrap_exception()
711 # build a fake message:
711 # build a fake message:
712 parent = {}
712 parent = {}
713 header = {}
713 header = {}
714 parent['msg_id'] = msg_id
714 parent['msg_id'] = msg_id
715 header['engine'] = uuid
715 header['engine'] = uuid
716 header['date'] = datetime.now()
716 header['date'] = datetime.now()
717 msg = dict(parent_header=parent, header=header, content=content)
717 msg = dict(parent_header=parent, header=header, content=content)
718 self._handle_apply_reply(msg)
718 self._handle_apply_reply(msg)
719
719
720 def _handle_execute_reply(self, msg):
720 def _handle_execute_reply(self, msg):
721 """Save the reply to an execute_request into our results.
721 """Save the reply to an execute_request into our results.
722
722
723 execute messages are never actually used. apply is used instead.
723 execute messages are never actually used. apply is used instead.
724 """
724 """
725
725
726 parent = msg['parent_header']
726 parent = msg['parent_header']
727 msg_id = parent['msg_id']
727 msg_id = parent['msg_id']
728 if msg_id not in self.outstanding:
728 if msg_id not in self.outstanding:
729 if msg_id in self.history:
729 if msg_id in self.history:
730 print ("got stale result: %s"%msg_id)
730 print ("got stale result: %s"%msg_id)
731 else:
731 else:
732 print ("got unknown result: %s"%msg_id)
732 print ("got unknown result: %s"%msg_id)
733 else:
733 else:
734 self.outstanding.remove(msg_id)
734 self.outstanding.remove(msg_id)
735
735
736 content = msg['content']
736 content = msg['content']
737 header = msg['header']
737 header = msg['header']
738
738
739 # construct metadata:
739 # construct metadata:
740 md = self.metadata[msg_id]
740 md = self.metadata[msg_id]
741 md.update(self._extract_metadata(header, parent, content))
741 md.update(self._extract_metadata(header, parent, content))
742 # is this redundant?
742 # is this redundant?
743 self.metadata[msg_id] = md
743 self.metadata[msg_id] = md
744
744
745 e_outstanding = self._outstanding_dict[md['engine_uuid']]
745 e_outstanding = self._outstanding_dict[md['engine_uuid']]
746 if msg_id in e_outstanding:
746 if msg_id in e_outstanding:
747 e_outstanding.remove(msg_id)
747 e_outstanding.remove(msg_id)
748
748
749 # construct result:
749 # construct result:
750 if content['status'] == 'ok':
750 if content['status'] == 'ok':
751 self.results[msg_id] = ExecuteReply(msg_id, content, md)
751 self.results[msg_id] = ExecuteReply(msg_id, content, md)
752 elif content['status'] == 'aborted':
752 elif content['status'] == 'aborted':
753 self.results[msg_id] = error.TaskAborted(msg_id)
753 self.results[msg_id] = error.TaskAborted(msg_id)
754 elif content['status'] == 'resubmitted':
754 elif content['status'] == 'resubmitted':
755 # TODO: handle resubmission
755 # TODO: handle resubmission
756 pass
756 pass
757 else:
757 else:
758 self.results[msg_id] = self._unwrap_exception(content)
758 self.results[msg_id] = self._unwrap_exception(content)
759
759
760 def _handle_apply_reply(self, msg):
760 def _handle_apply_reply(self, msg):
761 """Save the reply to an apply_request into our results."""
761 """Save the reply to an apply_request into our results."""
762 parent = msg['parent_header']
762 parent = msg['parent_header']
763 msg_id = parent['msg_id']
763 msg_id = parent['msg_id']
764 if msg_id not in self.outstanding:
764 if msg_id not in self.outstanding:
765 if msg_id in self.history:
765 if msg_id in self.history:
766 print ("got stale result: %s"%msg_id)
766 print ("got stale result: %s"%msg_id)
767 print self.results[msg_id]
767 print self.results[msg_id]
768 print msg
768 print msg
769 else:
769 else:
770 print ("got unknown result: %s"%msg_id)
770 print ("got unknown result: %s"%msg_id)
771 else:
771 else:
772 self.outstanding.remove(msg_id)
772 self.outstanding.remove(msg_id)
773 content = msg['content']
773 content = msg['content']
774 header = msg['header']
774 header = msg['header']
775
775
776 # construct metadata:
776 # construct metadata:
777 md = self.metadata[msg_id]
777 md = self.metadata[msg_id]
778 md.update(self._extract_metadata(header, parent, content))
778 md.update(self._extract_metadata(header, parent, content))
779 # is this redundant?
779 # is this redundant?
780 self.metadata[msg_id] = md
780 self.metadata[msg_id] = md
781
781
782 e_outstanding = self._outstanding_dict[md['engine_uuid']]
782 e_outstanding = self._outstanding_dict[md['engine_uuid']]
783 if msg_id in e_outstanding:
783 if msg_id in e_outstanding:
784 e_outstanding.remove(msg_id)
784 e_outstanding.remove(msg_id)
785
785
786 # construct result:
786 # construct result:
787 if content['status'] == 'ok':
787 if content['status'] == 'ok':
788 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
788 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
789 elif content['status'] == 'aborted':
789 elif content['status'] == 'aborted':
790 self.results[msg_id] = error.TaskAborted(msg_id)
790 self.results[msg_id] = error.TaskAborted(msg_id)
791 elif content['status'] == 'resubmitted':
791 elif content['status'] == 'resubmitted':
792 # TODO: handle resubmission
792 # TODO: handle resubmission
793 pass
793 pass
794 else:
794 else:
795 self.results[msg_id] = self._unwrap_exception(content)
795 self.results[msg_id] = self._unwrap_exception(content)
796
796
797 def _flush_notifications(self):
797 def _flush_notifications(self):
798 """Flush notifications of engine registrations waiting
798 """Flush notifications of engine registrations waiting
799 in ZMQ queue."""
799 in ZMQ queue."""
800 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
800 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
801 while msg is not None:
801 while msg is not None:
802 if self.debug:
802 if self.debug:
803 pprint(msg)
803 pprint(msg)
804 msg_type = msg['header']['msg_type']
804 msg_type = msg['header']['msg_type']
805 handler = self._notification_handlers.get(msg_type, None)
805 handler = self._notification_handlers.get(msg_type, None)
806 if handler is None:
806 if handler is None:
807 raise Exception("Unhandled message type: %s"%msg.msg_type)
807 raise Exception("Unhandled message type: %s"%msg.msg_type)
808 else:
808 else:
809 handler(msg)
809 handler(msg)
810 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
810 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
811
811
812 def _flush_results(self, sock):
812 def _flush_results(self, sock):
813 """Flush task or queue results waiting in ZMQ queue."""
813 """Flush task or queue results waiting in ZMQ queue."""
814 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
814 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
815 while msg is not None:
815 while msg is not None:
816 if self.debug:
816 if self.debug:
817 pprint(msg)
817 pprint(msg)
818 msg_type = msg['header']['msg_type']
818 msg_type = msg['header']['msg_type']
819 handler = self._queue_handlers.get(msg_type, None)
819 handler = self._queue_handlers.get(msg_type, None)
820 if handler is None:
820 if handler is None:
821 raise Exception("Unhandled message type: %s"%msg.msg_type)
821 raise Exception("Unhandled message type: %s"%msg.msg_type)
822 else:
822 else:
823 handler(msg)
823 handler(msg)
824 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
824 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
825
825
826 def _flush_control(self, sock):
826 def _flush_control(self, sock):
827 """Flush replies from the control channel waiting
827 """Flush replies from the control channel waiting
828 in the ZMQ queue.
828 in the ZMQ queue.
829
829
830 Currently: ignore them."""
830 Currently: ignore them."""
831 if self._ignored_control_replies <= 0:
831 if self._ignored_control_replies <= 0:
832 return
832 return
833 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
833 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
834 while msg is not None:
834 while msg is not None:
835 self._ignored_control_replies -= 1
835 self._ignored_control_replies -= 1
836 if self.debug:
836 if self.debug:
837 pprint(msg)
837 pprint(msg)
838 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
838 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
839
839
840 def _flush_ignored_control(self):
840 def _flush_ignored_control(self):
841 """flush ignored control replies"""
841 """flush ignored control replies"""
842 while self._ignored_control_replies > 0:
842 while self._ignored_control_replies > 0:
843 self.session.recv(self._control_socket)
843 self.session.recv(self._control_socket)
844 self._ignored_control_replies -= 1
844 self._ignored_control_replies -= 1
845
845
846 def _flush_ignored_hub_replies(self):
846 def _flush_ignored_hub_replies(self):
847 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
847 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
848 while msg is not None:
848 while msg is not None:
849 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
849 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
850
850
851 def _flush_iopub(self, sock):
851 def _flush_iopub(self, sock):
852 """Flush replies from the iopub channel waiting
852 """Flush replies from the iopub channel waiting
853 in the ZMQ queue.
853 in the ZMQ queue.
854 """
854 """
855 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
855 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
856 while msg is not None:
856 while msg is not None:
857 if self.debug:
857 if self.debug:
858 pprint(msg)
858 pprint(msg)
859 parent = msg['parent_header']
859 parent = msg['parent_header']
860 # ignore IOPub messages with no parent.
860 # ignore IOPub messages with no parent.
861 # Caused by print statements or warnings from before the first execution.
861 # Caused by print statements or warnings from before the first execution.
862 if not parent:
862 if not parent:
863 continue
863 continue
864 msg_id = parent['msg_id']
864 msg_id = parent['msg_id']
865 content = msg['content']
865 content = msg['content']
866 header = msg['header']
866 header = msg['header']
867 msg_type = msg['header']['msg_type']
867 msg_type = msg['header']['msg_type']
868
868
869 # init metadata:
869 # init metadata:
870 md = self.metadata[msg_id]
870 md = self.metadata[msg_id]
871
871
872 if msg_type == 'stream':
872 if msg_type == 'stream':
873 name = content['name']
873 name = content['name']
874 s = md[name] or ''
874 s = md[name] or ''
875 md[name] = s + content['data']
875 md[name] = s + content['data']
876 elif msg_type == 'pyerr':
876 elif msg_type == 'pyerr':
877 md.update({'pyerr' : self._unwrap_exception(content)})
877 md.update({'pyerr' : self._unwrap_exception(content)})
878 elif msg_type == 'pyin':
878 elif msg_type == 'pyin':
879 md.update({'pyin' : content['code']})
879 md.update({'pyin' : content['code']})
880 elif msg_type == 'display_data':
880 elif msg_type == 'display_data':
881 md['outputs'].append(content)
881 md['outputs'].append(content)
882 elif msg_type == 'pyout':
882 elif msg_type == 'pyout':
883 md['pyout'] = content
883 md['pyout'] = content
884 elif msg_type == 'status':
884 elif msg_type == 'status':
885 # idle message comes after all outputs
885 # idle message comes after all outputs
886 if content['execution_state'] == 'idle':
886 if content['execution_state'] == 'idle':
887 md['outputs_ready'] = True
887 md['outputs_ready'] = True
888 else:
888 else:
889 # unhandled msg_type (status, etc.)
889 # unhandled msg_type (status, etc.)
890 pass
890 pass
891
891
892 # reduntant?
892 # reduntant?
893 self.metadata[msg_id] = md
893 self.metadata[msg_id] = md
894
894
895 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
895 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
896
896
897 #--------------------------------------------------------------------------
897 #--------------------------------------------------------------------------
898 # len, getitem
898 # len, getitem
899 #--------------------------------------------------------------------------
899 #--------------------------------------------------------------------------
900
900
901 def __len__(self):
901 def __len__(self):
902 """len(client) returns # of engines."""
902 """len(client) returns # of engines."""
903 return len(self.ids)
903 return len(self.ids)
904
904
905 def __getitem__(self, key):
905 def __getitem__(self, key):
906 """index access returns DirectView multiplexer objects
906 """index access returns DirectView multiplexer objects
907
907
908 Must be int, slice, or list/tuple/xrange of ints"""
908 Must be int, slice, or list/tuple/xrange of ints"""
909 if not isinstance(key, (int, slice, tuple, list, xrange)):
909 if not isinstance(key, (int, slice, tuple, list, xrange)):
910 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
910 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
911 else:
911 else:
912 return self.direct_view(key)
912 return self.direct_view(key)
913
913
914 #--------------------------------------------------------------------------
914 #--------------------------------------------------------------------------
915 # Begin public methods
915 # Begin public methods
916 #--------------------------------------------------------------------------
916 #--------------------------------------------------------------------------
917
917
918 @property
918 @property
919 def ids(self):
919 def ids(self):
920 """Always up-to-date ids property."""
920 """Always up-to-date ids property."""
921 self._flush_notifications()
921 self._flush_notifications()
922 # always copy:
922 # always copy:
923 return list(self._ids)
923 return list(self._ids)
924
924
925 def activate(self, targets='all', suffix=''):
925 def activate(self, targets='all', suffix=''):
926 """Create a DirectView and register it with IPython magics
926 """Create a DirectView and register it with IPython magics
927
927
928 Defines the magics `%px, %autopx, %pxresult, %%px`
928 Defines the magics `%px, %autopx, %pxresult, %%px`
929
929
930 Parameters
930 Parameters
931 ----------
931 ----------
932
932
933 targets: int, list of ints, or 'all'
933 targets: int, list of ints, or 'all'
934 The engines on which the view's magics will run
934 The engines on which the view's magics will run
935 suffix: str [default: '']
935 suffix: str [default: '']
936 The suffix, if any, for the magics. This allows you to have
936 The suffix, if any, for the magics. This allows you to have
937 multiple views associated with parallel magics at the same time.
937 multiple views associated with parallel magics at the same time.
938
938
939 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
939 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
940 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
940 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
941 on engine 0.
941 on engine 0.
942 """
942 """
943 view = self.direct_view(targets)
943 view = self.direct_view(targets)
944 view.block = True
944 view.block = True
945 view.activate(suffix)
945 view.activate(suffix)
946 return view
946 return view
947
947
948 def close(self):
948 def close(self):
949 if self._closed:
949 if self._closed:
950 return
950 return
951 self.stop_spin_thread()
951 self.stop_spin_thread()
952 snames = filter(lambda n: n.endswith('socket'), dir(self))
952 snames = filter(lambda n: n.endswith('socket'), dir(self))
953 for socket in map(lambda name: getattr(self, name), snames):
953 for socket in map(lambda name: getattr(self, name), snames):
954 if isinstance(socket, zmq.Socket) and not socket.closed:
954 if isinstance(socket, zmq.Socket) and not socket.closed:
955 socket.close()
955 socket.close()
956 self._closed = True
956 self._closed = True
957
957
958 def _spin_every(self, interval=1):
958 def _spin_every(self, interval=1):
959 """target func for use in spin_thread"""
959 """target func for use in spin_thread"""
960 while True:
960 while True:
961 if self._stop_spinning.is_set():
961 if self._stop_spinning.is_set():
962 return
962 return
963 time.sleep(interval)
963 time.sleep(interval)
964 self.spin()
964 self.spin()
965
965
966 def spin_thread(self, interval=1):
966 def spin_thread(self, interval=1):
967 """call Client.spin() in a background thread on some regular interval
967 """call Client.spin() in a background thread on some regular interval
968
968
969 This helps ensure that messages don't pile up too much in the zmq queue
969 This helps ensure that messages don't pile up too much in the zmq queue
970 while you are working on other things, or just leaving an idle terminal.
970 while you are working on other things, or just leaving an idle terminal.
971
971
972 It also helps limit potential padding of the `received` timestamp
972 It also helps limit potential padding of the `received` timestamp
973 on AsyncResult objects, used for timings.
973 on AsyncResult objects, used for timings.
974
974
975 Parameters
975 Parameters
976 ----------
976 ----------
977
977
978 interval : float, optional
978 interval : float, optional
979 The interval on which to spin the client in the background thread
979 The interval on which to spin the client in the background thread
980 (simply passed to time.sleep).
980 (simply passed to time.sleep).
981
981
982 Notes
982 Notes
983 -----
983 -----
984
984
985 For precision timing, you may want to use this method to put a bound
985 For precision timing, you may want to use this method to put a bound
986 on the jitter (in seconds) in `received` timestamps used
986 on the jitter (in seconds) in `received` timestamps used
987 in AsyncResult.wall_time.
987 in AsyncResult.wall_time.
988
988
989 """
989 """
990 if self._spin_thread is not None:
990 if self._spin_thread is not None:
991 self.stop_spin_thread()
991 self.stop_spin_thread()
992 self._stop_spinning.clear()
992 self._stop_spinning.clear()
993 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
993 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
994 self._spin_thread.daemon = True
994 self._spin_thread.daemon = True
995 self._spin_thread.start()
995 self._spin_thread.start()
996
996
997 def stop_spin_thread(self):
997 def stop_spin_thread(self):
998 """stop background spin_thread, if any"""
998 """stop background spin_thread, if any"""
999 if self._spin_thread is not None:
999 if self._spin_thread is not None:
1000 self._stop_spinning.set()
1000 self._stop_spinning.set()
1001 self._spin_thread.join()
1001 self._spin_thread.join()
1002 self._spin_thread = None
1002 self._spin_thread = None
1003
1003
1004 def spin(self):
1004 def spin(self):
1005 """Flush any registration notifications and execution results
1005 """Flush any registration notifications and execution results
1006 waiting in the ZMQ queue.
1006 waiting in the ZMQ queue.
1007 """
1007 """
1008 if self._notification_socket:
1008 if self._notification_socket:
1009 self._flush_notifications()
1009 self._flush_notifications()
1010 if self._iopub_socket:
1010 if self._iopub_socket:
1011 self._flush_iopub(self._iopub_socket)
1011 self._flush_iopub(self._iopub_socket)
1012 if self._mux_socket:
1012 if self._mux_socket:
1013 self._flush_results(self._mux_socket)
1013 self._flush_results(self._mux_socket)
1014 if self._task_socket:
1014 if self._task_socket:
1015 self._flush_results(self._task_socket)
1015 self._flush_results(self._task_socket)
1016 if self._control_socket:
1016 if self._control_socket:
1017 self._flush_control(self._control_socket)
1017 self._flush_control(self._control_socket)
1018 if self._query_socket:
1018 if self._query_socket:
1019 self._flush_ignored_hub_replies()
1019 self._flush_ignored_hub_replies()
1020
1020
1021 def wait(self, jobs=None, timeout=-1):
1021 def wait(self, jobs=None, timeout=-1):
1022 """waits on one or more `jobs`, for up to `timeout` seconds.
1022 """waits on one or more `jobs`, for up to `timeout` seconds.
1023
1023
1024 Parameters
1024 Parameters
1025 ----------
1025 ----------
1026
1026
1027 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1027 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1028 ints are indices to self.history
1028 ints are indices to self.history
1029 strs are msg_ids
1029 strs are msg_ids
1030 default: wait on all outstanding messages
1030 default: wait on all outstanding messages
1031 timeout : float
1031 timeout : float
1032 a time in seconds, after which to give up.
1032 a time in seconds, after which to give up.
1033 default is -1, which means no timeout
1033 default is -1, which means no timeout
1034
1034
1035 Returns
1035 Returns
1036 -------
1036 -------
1037
1037
1038 True : when all msg_ids are done
1038 True : when all msg_ids are done
1039 False : timeout reached, some msg_ids still outstanding
1039 False : timeout reached, some msg_ids still outstanding
1040 """
1040 """
1041 tic = time.time()
1041 tic = time.time()
1042 if jobs is None:
1042 if jobs is None:
1043 theids = self.outstanding
1043 theids = self.outstanding
1044 else:
1044 else:
1045 if isinstance(jobs, (int, basestring, AsyncResult)):
1045 if isinstance(jobs, (int, basestring, AsyncResult)):
1046 jobs = [jobs]
1046 jobs = [jobs]
1047 theids = set()
1047 theids = set()
1048 for job in jobs:
1048 for job in jobs:
1049 if isinstance(job, int):
1049 if isinstance(job, int):
1050 # index access
1050 # index access
1051 job = self.history[job]
1051 job = self.history[job]
1052 elif isinstance(job, AsyncResult):
1052 elif isinstance(job, AsyncResult):
1053 map(theids.add, job.msg_ids)
1053 map(theids.add, job.msg_ids)
1054 continue
1054 continue
1055 theids.add(job)
1055 theids.add(job)
1056 if not theids.intersection(self.outstanding):
1056 if not theids.intersection(self.outstanding):
1057 return True
1057 return True
1058 self.spin()
1058 self.spin()
1059 while theids.intersection(self.outstanding):
1059 while theids.intersection(self.outstanding):
1060 if timeout >= 0 and ( time.time()-tic ) > timeout:
1060 if timeout >= 0 and ( time.time()-tic ) > timeout:
1061 break
1061 break
1062 time.sleep(1e-3)
1062 time.sleep(1e-3)
1063 self.spin()
1063 self.spin()
1064 return len(theids.intersection(self.outstanding)) == 0
1064 return len(theids.intersection(self.outstanding)) == 0
1065
1065
1066 #--------------------------------------------------------------------------
1066 #--------------------------------------------------------------------------
1067 # Control methods
1067 # Control methods
1068 #--------------------------------------------------------------------------
1068 #--------------------------------------------------------------------------
1069
1069
1070 @spin_first
1070 @spin_first
1071 def clear(self, targets=None, block=None):
1071 def clear(self, targets=None, block=None):
1072 """Clear the namespace in target(s)."""
1072 """Clear the namespace in target(s)."""
1073 block = self.block if block is None else block
1073 block = self.block if block is None else block
1074 targets = self._build_targets(targets)[0]
1074 targets = self._build_targets(targets)[0]
1075 for t in targets:
1075 for t in targets:
1076 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1076 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1077 error = False
1077 error = False
1078 if block:
1078 if block:
1079 self._flush_ignored_control()
1079 self._flush_ignored_control()
1080 for i in range(len(targets)):
1080 for i in range(len(targets)):
1081 idents,msg = self.session.recv(self._control_socket,0)
1081 idents,msg = self.session.recv(self._control_socket,0)
1082 if self.debug:
1082 if self.debug:
1083 pprint(msg)
1083 pprint(msg)
1084 if msg['content']['status'] != 'ok':
1084 if msg['content']['status'] != 'ok':
1085 error = self._unwrap_exception(msg['content'])
1085 error = self._unwrap_exception(msg['content'])
1086 else:
1086 else:
1087 self._ignored_control_replies += len(targets)
1087 self._ignored_control_replies += len(targets)
1088 if error:
1088 if error:
1089 raise error
1089 raise error
1090
1090
1091
1091
1092 @spin_first
1092 @spin_first
1093 def abort(self, jobs=None, targets=None, block=None):
1093 def abort(self, jobs=None, targets=None, block=None):
1094 """Abort specific jobs from the execution queues of target(s).
1094 """Abort specific jobs from the execution queues of target(s).
1095
1095
1096 This is a mechanism to prevent jobs that have already been submitted
1096 This is a mechanism to prevent jobs that have already been submitted
1097 from executing.
1097 from executing.
1098
1098
1099 Parameters
1099 Parameters
1100 ----------
1100 ----------
1101
1101
1102 jobs : msg_id, list of msg_ids, or AsyncResult
1102 jobs : msg_id, list of msg_ids, or AsyncResult
1103 The jobs to be aborted
1103 The jobs to be aborted
1104
1104
1105 If unspecified/None: abort all outstanding jobs.
1105 If unspecified/None: abort all outstanding jobs.
1106
1106
1107 """
1107 """
1108 block = self.block if block is None else block
1108 block = self.block if block is None else block
1109 jobs = jobs if jobs is not None else list(self.outstanding)
1109 jobs = jobs if jobs is not None else list(self.outstanding)
1110 targets = self._build_targets(targets)[0]
1110 targets = self._build_targets(targets)[0]
1111
1111
1112 msg_ids = []
1112 msg_ids = []
1113 if isinstance(jobs, (basestring,AsyncResult)):
1113 if isinstance(jobs, (basestring,AsyncResult)):
1114 jobs = [jobs]
1114 jobs = [jobs]
1115 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1115 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1116 if bad_ids:
1116 if bad_ids:
1117 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1117 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1118 for j in jobs:
1118 for j in jobs:
1119 if isinstance(j, AsyncResult):
1119 if isinstance(j, AsyncResult):
1120 msg_ids.extend(j.msg_ids)
1120 msg_ids.extend(j.msg_ids)
1121 else:
1121 else:
1122 msg_ids.append(j)
1122 msg_ids.append(j)
1123 content = dict(msg_ids=msg_ids)
1123 content = dict(msg_ids=msg_ids)
1124 for t in targets:
1124 for t in targets:
1125 self.session.send(self._control_socket, 'abort_request',
1125 self.session.send(self._control_socket, 'abort_request',
1126 content=content, ident=t)
1126 content=content, ident=t)
1127 error = False
1127 error = False
1128 if block:
1128 if block:
1129 self._flush_ignored_control()
1129 self._flush_ignored_control()
1130 for i in range(len(targets)):
1130 for i in range(len(targets)):
1131 idents,msg = self.session.recv(self._control_socket,0)
1131 idents,msg = self.session.recv(self._control_socket,0)
1132 if self.debug:
1132 if self.debug:
1133 pprint(msg)
1133 pprint(msg)
1134 if msg['content']['status'] != 'ok':
1134 if msg['content']['status'] != 'ok':
1135 error = self._unwrap_exception(msg['content'])
1135 error = self._unwrap_exception(msg['content'])
1136 else:
1136 else:
1137 self._ignored_control_replies += len(targets)
1137 self._ignored_control_replies += len(targets)
1138 if error:
1138 if error:
1139 raise error
1139 raise error
1140
1140
1141 @spin_first
1141 @spin_first
1142 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1142 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1143 """Terminates one or more engine processes, optionally including the hub.
1143 """Terminates one or more engine processes, optionally including the hub.
1144
1144
1145 Parameters
1145 Parameters
1146 ----------
1146 ----------
1147
1147
1148 targets: list of ints or 'all' [default: all]
1148 targets: list of ints or 'all' [default: all]
1149 Which engines to shutdown.
1149 Which engines to shutdown.
1150 hub: bool [default: False]
1150 hub: bool [default: False]
1151 Whether to include the Hub. hub=True implies targets='all'.
1151 Whether to include the Hub. hub=True implies targets='all'.
1152 block: bool [default: self.block]
1152 block: bool [default: self.block]
1153 Whether to wait for clean shutdown replies or not.
1153 Whether to wait for clean shutdown replies or not.
1154 restart: bool [default: False]
1154 restart: bool [default: False]
1155 NOT IMPLEMENTED
1155 NOT IMPLEMENTED
1156 whether to restart engines after shutting them down.
1156 whether to restart engines after shutting them down.
1157 """
1157 """
1158
1158
1159 if restart:
1159 if restart:
1160 raise NotImplementedError("Engine restart is not yet implemented")
1160 raise NotImplementedError("Engine restart is not yet implemented")
1161
1161
1162 block = self.block if block is None else block
1162 block = self.block if block is None else block
1163 if hub:
1163 if hub:
1164 targets = 'all'
1164 targets = 'all'
1165 targets = self._build_targets(targets)[0]
1165 targets = self._build_targets(targets)[0]
1166 for t in targets:
1166 for t in targets:
1167 self.session.send(self._control_socket, 'shutdown_request',
1167 self.session.send(self._control_socket, 'shutdown_request',
1168 content={'restart':restart},ident=t)
1168 content={'restart':restart},ident=t)
1169 error = False
1169 error = False
1170 if block or hub:
1170 if block or hub:
1171 self._flush_ignored_control()
1171 self._flush_ignored_control()
1172 for i in range(len(targets)):
1172 for i in range(len(targets)):
1173 idents,msg = self.session.recv(self._control_socket, 0)
1173 idents,msg = self.session.recv(self._control_socket, 0)
1174 if self.debug:
1174 if self.debug:
1175 pprint(msg)
1175 pprint(msg)
1176 if msg['content']['status'] != 'ok':
1176 if msg['content']['status'] != 'ok':
1177 error = self._unwrap_exception(msg['content'])
1177 error = self._unwrap_exception(msg['content'])
1178 else:
1178 else:
1179 self._ignored_control_replies += len(targets)
1179 self._ignored_control_replies += len(targets)
1180
1180
1181 if hub:
1181 if hub:
1182 time.sleep(0.25)
1182 time.sleep(0.25)
1183 self.session.send(self._query_socket, 'shutdown_request')
1183 self.session.send(self._query_socket, 'shutdown_request')
1184 idents,msg = self.session.recv(self._query_socket, 0)
1184 idents,msg = self.session.recv(self._query_socket, 0)
1185 if self.debug:
1185 if self.debug:
1186 pprint(msg)
1186 pprint(msg)
1187 if msg['content']['status'] != 'ok':
1187 if msg['content']['status'] != 'ok':
1188 error = self._unwrap_exception(msg['content'])
1188 error = self._unwrap_exception(msg['content'])
1189
1189
1190 if error:
1190 if error:
1191 raise error
1191 raise error
1192
1192
1193 #--------------------------------------------------------------------------
1193 #--------------------------------------------------------------------------
1194 # Execution related methods
1194 # Execution related methods
1195 #--------------------------------------------------------------------------
1195 #--------------------------------------------------------------------------
1196
1196
1197 def _maybe_raise(self, result):
1197 def _maybe_raise(self, result):
1198 """wrapper for maybe raising an exception if apply failed."""
1198 """wrapper for maybe raising an exception if apply failed."""
1199 if isinstance(result, error.RemoteError):
1199 if isinstance(result, error.RemoteError):
1200 raise result
1200 raise result
1201
1201
1202 return result
1202 return result
1203
1203
1204 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1204 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1205 ident=None):
1205 ident=None):
1206 """construct and send an apply message via a socket.
1206 """construct and send an apply message via a socket.
1207
1207
1208 This is the principal method with which all engine execution is performed by views.
1208 This is the principal method with which all engine execution is performed by views.
1209 """
1209 """
1210
1210
1211 if self._closed:
1211 if self._closed:
1212 raise RuntimeError("Client cannot be used after its sockets have been closed")
1212 raise RuntimeError("Client cannot be used after its sockets have been closed")
1213
1213
1214 # defaults:
1214 # defaults:
1215 args = args if args is not None else []
1215 args = args if args is not None else []
1216 kwargs = kwargs if kwargs is not None else {}
1216 kwargs = kwargs if kwargs is not None else {}
1217 subheader = subheader if subheader is not None else {}
1217 subheader = subheader if subheader is not None else {}
1218
1218
1219 # validate arguments
1219 # validate arguments
1220 if not callable(f) and not isinstance(f, Reference):
1220 if not callable(f) and not isinstance(f, Reference):
1221 raise TypeError("f must be callable, not %s"%type(f))
1221 raise TypeError("f must be callable, not %s"%type(f))
1222 if not isinstance(args, (tuple, list)):
1222 if not isinstance(args, (tuple, list)):
1223 raise TypeError("args must be tuple or list, not %s"%type(args))
1223 raise TypeError("args must be tuple or list, not %s"%type(args))
1224 if not isinstance(kwargs, dict):
1224 if not isinstance(kwargs, dict):
1225 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1225 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1226 if not isinstance(subheader, dict):
1226 if not isinstance(subheader, dict):
1227 raise TypeError("subheader must be dict, not %s"%type(subheader))
1227 raise TypeError("subheader must be dict, not %s"%type(subheader))
1228
1228
1229 bufs = util.pack_apply_message(f,args,kwargs)
1229 bufs = util.pack_apply_message(f,args,kwargs)
1230
1230
1231 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1231 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1232 subheader=subheader, track=track)
1232 subheader=subheader, track=track)
1233
1233
1234 msg_id = msg['header']['msg_id']
1234 msg_id = msg['header']['msg_id']
1235 self.outstanding.add(msg_id)
1235 self.outstanding.add(msg_id)
1236 if ident:
1236 if ident:
1237 # possibly routed to a specific engine
1237 # possibly routed to a specific engine
1238 if isinstance(ident, list):
1238 if isinstance(ident, list):
1239 ident = ident[-1]
1239 ident = ident[-1]
1240 if ident in self._engines.values():
1240 if ident in self._engines.values():
1241 # save for later, in case of engine death
1241 # save for later, in case of engine death
1242 self._outstanding_dict[ident].add(msg_id)
1242 self._outstanding_dict[ident].add(msg_id)
1243 self.history.append(msg_id)
1243 self.history.append(msg_id)
1244 self.metadata[msg_id]['submitted'] = datetime.now()
1244 self.metadata[msg_id]['submitted'] = datetime.now()
1245
1245
1246 return msg
1246 return msg
1247
1247
1248 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1248 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1249 """construct and send an execute request via a socket.
1249 """construct and send an execute request via a socket.
1250
1250
1251 """
1251 """
1252
1252
1253 if self._closed:
1253 if self._closed:
1254 raise RuntimeError("Client cannot be used after its sockets have been closed")
1254 raise RuntimeError("Client cannot be used after its sockets have been closed")
1255
1255
1256 # defaults:
1256 # defaults:
1257 subheader = subheader if subheader is not None else {}
1257 subheader = subheader if subheader is not None else {}
1258
1258
1259 # validate arguments
1259 # validate arguments
1260 if not isinstance(code, basestring):
1260 if not isinstance(code, basestring):
1261 raise TypeError("code must be text, not %s" % type(code))
1261 raise TypeError("code must be text, not %s" % type(code))
1262 if not isinstance(subheader, dict):
1262 if not isinstance(subheader, dict):
1263 raise TypeError("subheader must be dict, not %s" % type(subheader))
1263 raise TypeError("subheader must be dict, not %s" % type(subheader))
1264
1264
1265 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1265 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1266
1266
1267
1267
1268 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1268 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1269 subheader=subheader)
1269 subheader=subheader)
1270
1270
1271 msg_id = msg['header']['msg_id']
1271 msg_id = msg['header']['msg_id']
1272 self.outstanding.add(msg_id)
1272 self.outstanding.add(msg_id)
1273 if ident:
1273 if ident:
1274 # possibly routed to a specific engine
1274 # possibly routed to a specific engine
1275 if isinstance(ident, list):
1275 if isinstance(ident, list):
1276 ident = ident[-1]
1276 ident = ident[-1]
1277 if ident in self._engines.values():
1277 if ident in self._engines.values():
1278 # save for later, in case of engine death
1278 # save for later, in case of engine death
1279 self._outstanding_dict[ident].add(msg_id)
1279 self._outstanding_dict[ident].add(msg_id)
1280 self.history.append(msg_id)
1280 self.history.append(msg_id)
1281 self.metadata[msg_id]['submitted'] = datetime.now()
1281 self.metadata[msg_id]['submitted'] = datetime.now()
1282
1282
1283 return msg
1283 return msg
1284
1284
1285 #--------------------------------------------------------------------------
1285 #--------------------------------------------------------------------------
1286 # construct a View object
1286 # construct a View object
1287 #--------------------------------------------------------------------------
1287 #--------------------------------------------------------------------------
1288
1288
1289 def load_balanced_view(self, targets=None):
1289 def load_balanced_view(self, targets=None):
1290 """construct a DirectView object.
1290 """construct a DirectView object.
1291
1291
1292 If no arguments are specified, create a LoadBalancedView
1292 If no arguments are specified, create a LoadBalancedView
1293 using all engines.
1293 using all engines.
1294
1294
1295 Parameters
1295 Parameters
1296 ----------
1296 ----------
1297
1297
1298 targets: list,slice,int,etc. [default: use all engines]
1298 targets: list,slice,int,etc. [default: use all engines]
1299 The subset of engines across which to load-balance
1299 The subset of engines across which to load-balance
1300 """
1300 """
1301 if targets == 'all':
1301 if targets == 'all':
1302 targets = None
1302 targets = None
1303 if targets is not None:
1303 if targets is not None:
1304 targets = self._build_targets(targets)[1]
1304 targets = self._build_targets(targets)[1]
1305 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1305 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1306
1306
1307 def direct_view(self, targets='all'):
1307 def direct_view(self, targets='all'):
1308 """construct a DirectView object.
1308 """construct a DirectView object.
1309
1309
1310 If no targets are specified, create a DirectView using all engines.
1310 If no targets are specified, create a DirectView using all engines.
1311
1311
1312 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1312 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1313 evaluate the target engines at each execution, whereas rc[:] will connect to
1313 evaluate the target engines at each execution, whereas rc[:] will connect to
1314 all *current* engines, and that list will not change.
1314 all *current* engines, and that list will not change.
1315
1315
1316 That is, 'all' will always use all engines, whereas rc[:] will not use
1316 That is, 'all' will always use all engines, whereas rc[:] will not use
1317 engines added after the DirectView is constructed.
1317 engines added after the DirectView is constructed.
1318
1318
1319 Parameters
1319 Parameters
1320 ----------
1320 ----------
1321
1321
1322 targets: list,slice,int,etc. [default: use all engines]
1322 targets: list,slice,int,etc. [default: use all engines]
1323 The engines to use for the View
1323 The engines to use for the View
1324 """
1324 """
1325 single = isinstance(targets, int)
1325 single = isinstance(targets, int)
1326 # allow 'all' to be lazily evaluated at each execution
1326 # allow 'all' to be lazily evaluated at each execution
1327 if targets != 'all':
1327 if targets != 'all':
1328 targets = self._build_targets(targets)[1]
1328 targets = self._build_targets(targets)[1]
1329 if single:
1329 if single:
1330 targets = targets[0]
1330 targets = targets[0]
1331 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1331 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1332
1332
1333 #--------------------------------------------------------------------------
1333 #--------------------------------------------------------------------------
1334 # Query methods
1334 # Query methods
1335 #--------------------------------------------------------------------------
1335 #--------------------------------------------------------------------------
1336
1336
1337 @spin_first
1337 @spin_first
1338 def get_result(self, indices_or_msg_ids=None, block=None):
1338 def get_result(self, indices_or_msg_ids=None, block=None):
1339 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1339 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1340
1340
1341 If the client already has the results, no request to the Hub will be made.
1341 If the client already has the results, no request to the Hub will be made.
1342
1342
1343 This is a convenient way to construct AsyncResult objects, which are wrappers
1343 This is a convenient way to construct AsyncResult objects, which are wrappers
1344 that include metadata about execution, and allow for awaiting results that
1344 that include metadata about execution, and allow for awaiting results that
1345 were not submitted by this Client.
1345 were not submitted by this Client.
1346
1346
1347 It can also be a convenient way to retrieve the metadata associated with
1347 It can also be a convenient way to retrieve the metadata associated with
1348 blocking execution, since it always retrieves
1348 blocking execution, since it always retrieves
1349
1349
1350 Examples
1350 Examples
1351 --------
1351 --------
1352 ::
1352 ::
1353
1353
1354 In [10]: r = client.apply()
1354 In [10]: r = client.apply()
1355
1355
1356 Parameters
1356 Parameters
1357 ----------
1357 ----------
1358
1358
1359 indices_or_msg_ids : integer history index, str msg_id, or list of either
1359 indices_or_msg_ids : integer history index, str msg_id, or list of either
1360 The indices or msg_ids of indices to be retrieved
1360 The indices or msg_ids of indices to be retrieved
1361
1361
1362 block : bool
1362 block : bool
1363 Whether to wait for the result to be done
1363 Whether to wait for the result to be done
1364
1364
1365 Returns
1365 Returns
1366 -------
1366 -------
1367
1367
1368 AsyncResult
1368 AsyncResult
1369 A single AsyncResult object will always be returned.
1369 A single AsyncResult object will always be returned.
1370
1370
1371 AsyncHubResult
1371 AsyncHubResult
1372 A subclass of AsyncResult that retrieves results from the Hub
1372 A subclass of AsyncResult that retrieves results from the Hub
1373
1373
1374 """
1374 """
1375 block = self.block if block is None else block
1375 block = self.block if block is None else block
1376 if indices_or_msg_ids is None:
1376 if indices_or_msg_ids is None:
1377 indices_or_msg_ids = -1
1377 indices_or_msg_ids = -1
1378
1378
1379 if not isinstance(indices_or_msg_ids, (list,tuple)):
1379 if not isinstance(indices_or_msg_ids, (list,tuple)):
1380 indices_or_msg_ids = [indices_or_msg_ids]
1380 indices_or_msg_ids = [indices_or_msg_ids]
1381
1381
1382 theids = []
1382 theids = []
1383 for id in indices_or_msg_ids:
1383 for id in indices_or_msg_ids:
1384 if isinstance(id, int):
1384 if isinstance(id, int):
1385 id = self.history[id]
1385 id = self.history[id]
1386 if not isinstance(id, basestring):
1386 if not isinstance(id, basestring):
1387 raise TypeError("indices must be str or int, not %r"%id)
1387 raise TypeError("indices must be str or int, not %r"%id)
1388 theids.append(id)
1388 theids.append(id)
1389
1389
1390 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1390 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1391 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1391 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1392
1392
1393 if remote_ids:
1393 if remote_ids:
1394 ar = AsyncHubResult(self, msg_ids=theids)
1394 ar = AsyncHubResult(self, msg_ids=theids)
1395 else:
1395 else:
1396 ar = AsyncResult(self, msg_ids=theids)
1396 ar = AsyncResult(self, msg_ids=theids)
1397
1397
1398 if block:
1398 if block:
1399 ar.wait()
1399 ar.wait()
1400
1400
1401 return ar
1401 return ar
1402
1402
1403 @spin_first
1403 @spin_first
1404 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1404 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1405 """Resubmit one or more tasks.
1405 """Resubmit one or more tasks.
1406
1406
1407 in-flight tasks may not be resubmitted.
1407 in-flight tasks may not be resubmitted.
1408
1408
1409 Parameters
1409 Parameters
1410 ----------
1410 ----------
1411
1411
1412 indices_or_msg_ids : integer history index, str msg_id, or list of either
1412 indices_or_msg_ids : integer history index, str msg_id, or list of either
1413 The indices or msg_ids of indices to be retrieved
1413 The indices or msg_ids of indices to be retrieved
1414
1414
1415 block : bool
1415 block : bool
1416 Whether to wait for the result to be done
1416 Whether to wait for the result to be done
1417
1417
1418 Returns
1418 Returns
1419 -------
1419 -------
1420
1420
1421 AsyncHubResult
1421 AsyncHubResult
1422 A subclass of AsyncResult that retrieves results from the Hub
1422 A subclass of AsyncResult that retrieves results from the Hub
1423
1423
1424 """
1424 """
1425 block = self.block if block is None else block
1425 block = self.block if block is None else block
1426 if indices_or_msg_ids is None:
1426 if indices_or_msg_ids is None:
1427 indices_or_msg_ids = -1
1427 indices_or_msg_ids = -1
1428
1428
1429 if not isinstance(indices_or_msg_ids, (list,tuple)):
1429 if not isinstance(indices_or_msg_ids, (list,tuple)):
1430 indices_or_msg_ids = [indices_or_msg_ids]
1430 indices_or_msg_ids = [indices_or_msg_ids]
1431
1431
1432 theids = []
1432 theids = []
1433 for id in indices_or_msg_ids:
1433 for id in indices_or_msg_ids:
1434 if isinstance(id, int):
1434 if isinstance(id, int):
1435 id = self.history[id]
1435 id = self.history[id]
1436 if not isinstance(id, basestring):
1436 if not isinstance(id, basestring):
1437 raise TypeError("indices must be str or int, not %r"%id)
1437 raise TypeError("indices must be str or int, not %r"%id)
1438 theids.append(id)
1438 theids.append(id)
1439
1439
1440 content = dict(msg_ids = theids)
1440 content = dict(msg_ids = theids)
1441
1441
1442 self.session.send(self._query_socket, 'resubmit_request', content)
1442 self.session.send(self._query_socket, 'resubmit_request', content)
1443
1443
1444 zmq.select([self._query_socket], [], [])
1444 zmq.select([self._query_socket], [], [])
1445 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1445 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1446 if self.debug:
1446 if self.debug:
1447 pprint(msg)
1447 pprint(msg)
1448 content = msg['content']
1448 content = msg['content']
1449 if content['status'] != 'ok':
1449 if content['status'] != 'ok':
1450 raise self._unwrap_exception(content)
1450 raise self._unwrap_exception(content)
1451 mapping = content['resubmitted']
1451 mapping = content['resubmitted']
1452 new_ids = [ mapping[msg_id] for msg_id in theids ]
1452 new_ids = [ mapping[msg_id] for msg_id in theids ]
1453
1453
1454 ar = AsyncHubResult(self, msg_ids=new_ids)
1454 ar = AsyncHubResult(self, msg_ids=new_ids)
1455
1455
1456 if block:
1456 if block:
1457 ar.wait()
1457 ar.wait()
1458
1458
1459 return ar
1459 return ar
1460
1460
1461 @spin_first
1461 @spin_first
1462 def result_status(self, msg_ids, status_only=True):
1462 def result_status(self, msg_ids, status_only=True):
1463 """Check on the status of the result(s) of the apply request with `msg_ids`.
1463 """Check on the status of the result(s) of the apply request with `msg_ids`.
1464
1464
1465 If status_only is False, then the actual results will be retrieved, else
1465 If status_only is False, then the actual results will be retrieved, else
1466 only the status of the results will be checked.
1466 only the status of the results will be checked.
1467
1467
1468 Parameters
1468 Parameters
1469 ----------
1469 ----------
1470
1470
1471 msg_ids : list of msg_ids
1471 msg_ids : list of msg_ids
1472 if int:
1472 if int:
1473 Passed as index to self.history for convenience.
1473 Passed as index to self.history for convenience.
1474 status_only : bool (default: True)
1474 status_only : bool (default: True)
1475 if False:
1475 if False:
1476 Retrieve the actual results of completed tasks.
1476 Retrieve the actual results of completed tasks.
1477
1477
1478 Returns
1478 Returns
1479 -------
1479 -------
1480
1480
1481 results : dict
1481 results : dict
1482 There will always be the keys 'pending' and 'completed', which will
1482 There will always be the keys 'pending' and 'completed', which will
1483 be lists of msg_ids that are incomplete or complete. If `status_only`
1483 be lists of msg_ids that are incomplete or complete. If `status_only`
1484 is False, then completed results will be keyed by their `msg_id`.
1484 is False, then completed results will be keyed by their `msg_id`.
1485 """
1485 """
1486 if not isinstance(msg_ids, (list,tuple)):
1486 if not isinstance(msg_ids, (list,tuple)):
1487 msg_ids = [msg_ids]
1487 msg_ids = [msg_ids]
1488
1488
1489 theids = []
1489 theids = []
1490 for msg_id in msg_ids:
1490 for msg_id in msg_ids:
1491 if isinstance(msg_id, int):
1491 if isinstance(msg_id, int):
1492 msg_id = self.history[msg_id]
1492 msg_id = self.history[msg_id]
1493 if not isinstance(msg_id, basestring):
1493 if not isinstance(msg_id, basestring):
1494 raise TypeError("msg_ids must be str, not %r"%msg_id)
1494 raise TypeError("msg_ids must be str, not %r"%msg_id)
1495 theids.append(msg_id)
1495 theids.append(msg_id)
1496
1496
1497 completed = []
1497 completed = []
1498 local_results = {}
1498 local_results = {}
1499
1499
1500 # comment this block out to temporarily disable local shortcut:
1500 # comment this block out to temporarily disable local shortcut:
1501 for msg_id in theids:
1501 for msg_id in theids:
1502 if msg_id in self.results:
1502 if msg_id in self.results:
1503 completed.append(msg_id)
1503 completed.append(msg_id)
1504 local_results[msg_id] = self.results[msg_id]
1504 local_results[msg_id] = self.results[msg_id]
1505 theids.remove(msg_id)
1505 theids.remove(msg_id)
1506
1506
1507 if theids: # some not locally cached
1507 if theids: # some not locally cached
1508 content = dict(msg_ids=theids, status_only=status_only)
1508 content = dict(msg_ids=theids, status_only=status_only)
1509 msg = self.session.send(self._query_socket, "result_request", content=content)
1509 msg = self.session.send(self._query_socket, "result_request", content=content)
1510 zmq.select([self._query_socket], [], [])
1510 zmq.select([self._query_socket], [], [])
1511 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1511 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1512 if self.debug:
1512 if self.debug:
1513 pprint(msg)
1513 pprint(msg)
1514 content = msg['content']
1514 content = msg['content']
1515 if content['status'] != 'ok':
1515 if content['status'] != 'ok':
1516 raise self._unwrap_exception(content)
1516 raise self._unwrap_exception(content)
1517 buffers = msg['buffers']
1517 buffers = msg['buffers']
1518 else:
1518 else:
1519 content = dict(completed=[],pending=[])
1519 content = dict(completed=[],pending=[])
1520
1520
1521 content['completed'].extend(completed)
1521 content['completed'].extend(completed)
1522
1522
1523 if status_only:
1523 if status_only:
1524 return content
1524 return content
1525
1525
1526 failures = []
1526 failures = []
1527 # load cached results into result:
1527 # load cached results into result:
1528 content.update(local_results)
1528 content.update(local_results)
1529
1529
1530 # update cache with results:
1530 # update cache with results:
1531 for msg_id in sorted(theids):
1531 for msg_id in sorted(theids):
1532 if msg_id in content['completed']:
1532 if msg_id in content['completed']:
1533 rec = content[msg_id]
1533 rec = content[msg_id]
1534 parent = rec['header']
1534 parent = rec['header']
1535 header = rec['result_header']
1535 header = rec['result_header']
1536 rcontent = rec['result_content']
1536 rcontent = rec['result_content']
1537 iodict = rec['io']
1537 iodict = rec['io']
1538 if isinstance(rcontent, str):
1538 if isinstance(rcontent, str):
1539 rcontent = self.session.unpack(rcontent)
1539 rcontent = self.session.unpack(rcontent)
1540
1540
1541 md = self.metadata[msg_id]
1541 md = self.metadata[msg_id]
1542 md.update(self._extract_metadata(header, parent, rcontent))
1542 md.update(self._extract_metadata(header, parent, rcontent))
1543 if rec.get('received'):
1543 if rec.get('received'):
1544 md['received'] = rec['received']
1544 md['received'] = rec['received']
1545 md.update(iodict)
1545 md.update(iodict)
1546
1546
1547 if rcontent['status'] == 'ok':
1547 if rcontent['status'] == 'ok':
1548 res,buffers = util.unserialize_object(buffers)
1548 if header['msg_type'] == 'apply_reply':
1549 res,buffers = util.unserialize_object(buffers)
1550 elif header['msg_type'] == 'execute_reply':
1551 res = ExecuteReply(msg_id, rcontent, md)
1552 else:
1553 raise KeyError("unhandled msg type: %r" % header[msg_type])
1549 else:
1554 else:
1550 print rcontent
1551 res = self._unwrap_exception(rcontent)
1555 res = self._unwrap_exception(rcontent)
1552 failures.append(res)
1556 failures.append(res)
1553
1557
1554 self.results[msg_id] = res
1558 self.results[msg_id] = res
1555 content[msg_id] = res
1559 content[msg_id] = res
1556
1560
1557 if len(theids) == 1 and failures:
1561 if len(theids) == 1 and failures:
1558 raise failures[0]
1562 raise failures[0]
1559
1563
1560 error.collect_exceptions(failures, "result_status")
1564 error.collect_exceptions(failures, "result_status")
1561 return content
1565 return content
1562
1566
1563 @spin_first
1567 @spin_first
1564 def queue_status(self, targets='all', verbose=False):
1568 def queue_status(self, targets='all', verbose=False):
1565 """Fetch the status of engine queues.
1569 """Fetch the status of engine queues.
1566
1570
1567 Parameters
1571 Parameters
1568 ----------
1572 ----------
1569
1573
1570 targets : int/str/list of ints/strs
1574 targets : int/str/list of ints/strs
1571 the engines whose states are to be queried.
1575 the engines whose states are to be queried.
1572 default : all
1576 default : all
1573 verbose : bool
1577 verbose : bool
1574 Whether to return lengths only, or lists of ids for each element
1578 Whether to return lengths only, or lists of ids for each element
1575 """
1579 """
1576 if targets == 'all':
1580 if targets == 'all':
1577 # allow 'all' to be evaluated on the engine
1581 # allow 'all' to be evaluated on the engine
1578 engine_ids = None
1582 engine_ids = None
1579 else:
1583 else:
1580 engine_ids = self._build_targets(targets)[1]
1584 engine_ids = self._build_targets(targets)[1]
1581 content = dict(targets=engine_ids, verbose=verbose)
1585 content = dict(targets=engine_ids, verbose=verbose)
1582 self.session.send(self._query_socket, "queue_request", content=content)
1586 self.session.send(self._query_socket, "queue_request", content=content)
1583 idents,msg = self.session.recv(self._query_socket, 0)
1587 idents,msg = self.session.recv(self._query_socket, 0)
1584 if self.debug:
1588 if self.debug:
1585 pprint(msg)
1589 pprint(msg)
1586 content = msg['content']
1590 content = msg['content']
1587 status = content.pop('status')
1591 status = content.pop('status')
1588 if status != 'ok':
1592 if status != 'ok':
1589 raise self._unwrap_exception(content)
1593 raise self._unwrap_exception(content)
1590 content = rekey(content)
1594 content = rekey(content)
1591 if isinstance(targets, int):
1595 if isinstance(targets, int):
1592 return content[targets]
1596 return content[targets]
1593 else:
1597 else:
1594 return content
1598 return content
1595
1599
1596 @spin_first
1600 @spin_first
1597 def purge_results(self, jobs=[], targets=[]):
1601 def purge_results(self, jobs=[], targets=[]):
1598 """Tell the Hub to forget results.
1602 """Tell the Hub to forget results.
1599
1603
1600 Individual results can be purged by msg_id, or the entire
1604 Individual results can be purged by msg_id, or the entire
1601 history of specific targets can be purged.
1605 history of specific targets can be purged.
1602
1606
1603 Use `purge_results('all')` to scrub everything from the Hub's db.
1607 Use `purge_results('all')` to scrub everything from the Hub's db.
1604
1608
1605 Parameters
1609 Parameters
1606 ----------
1610 ----------
1607
1611
1608 jobs : str or list of str or AsyncResult objects
1612 jobs : str or list of str or AsyncResult objects
1609 the msg_ids whose results should be forgotten.
1613 the msg_ids whose results should be forgotten.
1610 targets : int/str/list of ints/strs
1614 targets : int/str/list of ints/strs
1611 The targets, by int_id, whose entire history is to be purged.
1615 The targets, by int_id, whose entire history is to be purged.
1612
1616
1613 default : None
1617 default : None
1614 """
1618 """
1615 if not targets and not jobs:
1619 if not targets and not jobs:
1616 raise ValueError("Must specify at least one of `targets` and `jobs`")
1620 raise ValueError("Must specify at least one of `targets` and `jobs`")
1617 if targets:
1621 if targets:
1618 targets = self._build_targets(targets)[1]
1622 targets = self._build_targets(targets)[1]
1619
1623
1620 # construct msg_ids from jobs
1624 # construct msg_ids from jobs
1621 if jobs == 'all':
1625 if jobs == 'all':
1622 msg_ids = jobs
1626 msg_ids = jobs
1623 else:
1627 else:
1624 msg_ids = []
1628 msg_ids = []
1625 if isinstance(jobs, (basestring,AsyncResult)):
1629 if isinstance(jobs, (basestring,AsyncResult)):
1626 jobs = [jobs]
1630 jobs = [jobs]
1627 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1631 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1628 if bad_ids:
1632 if bad_ids:
1629 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1633 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1630 for j in jobs:
1634 for j in jobs:
1631 if isinstance(j, AsyncResult):
1635 if isinstance(j, AsyncResult):
1632 msg_ids.extend(j.msg_ids)
1636 msg_ids.extend(j.msg_ids)
1633 else:
1637 else:
1634 msg_ids.append(j)
1638 msg_ids.append(j)
1635
1639
1636 content = dict(engine_ids=targets, msg_ids=msg_ids)
1640 content = dict(engine_ids=targets, msg_ids=msg_ids)
1637 self.session.send(self._query_socket, "purge_request", content=content)
1641 self.session.send(self._query_socket, "purge_request", content=content)
1638 idents, msg = self.session.recv(self._query_socket, 0)
1642 idents, msg = self.session.recv(self._query_socket, 0)
1639 if self.debug:
1643 if self.debug:
1640 pprint(msg)
1644 pprint(msg)
1641 content = msg['content']
1645 content = msg['content']
1642 if content['status'] != 'ok':
1646 if content['status'] != 'ok':
1643 raise self._unwrap_exception(content)
1647 raise self._unwrap_exception(content)
1644
1648
1645 @spin_first
1649 @spin_first
1646 def hub_history(self):
1650 def hub_history(self):
1647 """Get the Hub's history
1651 """Get the Hub's history
1648
1652
1649 Just like the Client, the Hub has a history, which is a list of msg_ids.
1653 Just like the Client, the Hub has a history, which is a list of msg_ids.
1650 This will contain the history of all clients, and, depending on configuration,
1654 This will contain the history of all clients, and, depending on configuration,
1651 may contain history across multiple cluster sessions.
1655 may contain history across multiple cluster sessions.
1652
1656
1653 Any msg_id returned here is a valid argument to `get_result`.
1657 Any msg_id returned here is a valid argument to `get_result`.
1654
1658
1655 Returns
1659 Returns
1656 -------
1660 -------
1657
1661
1658 msg_ids : list of strs
1662 msg_ids : list of strs
1659 list of all msg_ids, ordered by task submission time.
1663 list of all msg_ids, ordered by task submission time.
1660 """
1664 """
1661
1665
1662 self.session.send(self._query_socket, "history_request", content={})
1666 self.session.send(self._query_socket, "history_request", content={})
1663 idents, msg = self.session.recv(self._query_socket, 0)
1667 idents, msg = self.session.recv(self._query_socket, 0)
1664
1668
1665 if self.debug:
1669 if self.debug:
1666 pprint(msg)
1670 pprint(msg)
1667 content = msg['content']
1671 content = msg['content']
1668 if content['status'] != 'ok':
1672 if content['status'] != 'ok':
1669 raise self._unwrap_exception(content)
1673 raise self._unwrap_exception(content)
1670 else:
1674 else:
1671 return content['history']
1675 return content['history']
1672
1676
1673 @spin_first
1677 @spin_first
1674 def db_query(self, query, keys=None):
1678 def db_query(self, query, keys=None):
1675 """Query the Hub's TaskRecord database
1679 """Query the Hub's TaskRecord database
1676
1680
1677 This will return a list of task record dicts that match `query`
1681 This will return a list of task record dicts that match `query`
1678
1682
1679 Parameters
1683 Parameters
1680 ----------
1684 ----------
1681
1685
1682 query : mongodb query dict
1686 query : mongodb query dict
1683 The search dict. See mongodb query docs for details.
1687 The search dict. See mongodb query docs for details.
1684 keys : list of strs [optional]
1688 keys : list of strs [optional]
1685 The subset of keys to be returned. The default is to fetch everything but buffers.
1689 The subset of keys to be returned. The default is to fetch everything but buffers.
1686 'msg_id' will *always* be included.
1690 'msg_id' will *always* be included.
1687 """
1691 """
1688 if isinstance(keys, basestring):
1692 if isinstance(keys, basestring):
1689 keys = [keys]
1693 keys = [keys]
1690 content = dict(query=query, keys=keys)
1694 content = dict(query=query, keys=keys)
1691 self.session.send(self._query_socket, "db_request", content=content)
1695 self.session.send(self._query_socket, "db_request", content=content)
1692 idents, msg = self.session.recv(self._query_socket, 0)
1696 idents, msg = self.session.recv(self._query_socket, 0)
1693 if self.debug:
1697 if self.debug:
1694 pprint(msg)
1698 pprint(msg)
1695 content = msg['content']
1699 content = msg['content']
1696 if content['status'] != 'ok':
1700 if content['status'] != 'ok':
1697 raise self._unwrap_exception(content)
1701 raise self._unwrap_exception(content)
1698
1702
1699 records = content['records']
1703 records = content['records']
1700
1704
1701 buffer_lens = content['buffer_lens']
1705 buffer_lens = content['buffer_lens']
1702 result_buffer_lens = content['result_buffer_lens']
1706 result_buffer_lens = content['result_buffer_lens']
1703 buffers = msg['buffers']
1707 buffers = msg['buffers']
1704 has_bufs = buffer_lens is not None
1708 has_bufs = buffer_lens is not None
1705 has_rbufs = result_buffer_lens is not None
1709 has_rbufs = result_buffer_lens is not None
1706 for i,rec in enumerate(records):
1710 for i,rec in enumerate(records):
1707 # relink buffers
1711 # relink buffers
1708 if has_bufs:
1712 if has_bufs:
1709 blen = buffer_lens[i]
1713 blen = buffer_lens[i]
1710 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1714 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1711 if has_rbufs:
1715 if has_rbufs:
1712 blen = result_buffer_lens[i]
1716 blen = result_buffer_lens[i]
1713 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1717 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1714
1718
1715 return records
1719 return records
1716
1720
1717 __all__ = [ 'Client' ]
1721 __all__ = [ 'Client' ]
@@ -1,1314 +1,1321 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import sys
21 import sys
22 import time
22 import time
23 from datetime import datetime
23 from datetime import datetime
24
24
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27 from zmq.eventloop.zmqstream import ZMQStream
27 from zmq.eventloop.zmqstream import ZMQStream
28
28
29 # internal:
29 # internal:
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31 from IPython.utils.py3compat import cast_bytes
31 from IPython.utils.py3compat import cast_bytes
32 from IPython.utils.traitlets import (
32 from IPython.utils.traitlets import (
33 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
34 )
34 )
35
35
36 from IPython.parallel import error, util
36 from IPython.parallel import error, util
37 from IPython.parallel.factory import RegistrationFactory
37 from IPython.parallel.factory import RegistrationFactory
38
38
39 from IPython.zmq.session import SessionFactory
39 from IPython.zmq.session import SessionFactory
40
40
41 from .heartmonitor import HeartMonitor
41 from .heartmonitor import HeartMonitor
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Code
44 # Code
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 def _passer(*args, **kwargs):
47 def _passer(*args, **kwargs):
48 return
48 return
49
49
50 def _printer(*args, **kwargs):
50 def _printer(*args, **kwargs):
51 print (args)
51 print (args)
52 print (kwargs)
52 print (kwargs)
53
53
54 def empty_record():
54 def empty_record():
55 """Return an empty dict with all record keys."""
55 """Return an empty dict with all record keys."""
56 return {
56 return {
57 'msg_id' : None,
57 'msg_id' : None,
58 'header' : None,
58 'header' : None,
59 'content': None,
59 'content': None,
60 'buffers': None,
60 'buffers': None,
61 'submitted': None,
61 'submitted': None,
62 'client_uuid' : None,
62 'client_uuid' : None,
63 'engine_uuid' : None,
63 'engine_uuid' : None,
64 'started': None,
64 'started': None,
65 'completed': None,
65 'completed': None,
66 'resubmitted': None,
66 'resubmitted': None,
67 'received': None,
67 'received': None,
68 'result_header' : None,
68 'result_header' : None,
69 'result_content' : None,
69 'result_content' : None,
70 'result_buffers' : None,
70 'result_buffers' : None,
71 'queue' : None,
71 'queue' : None,
72 'pyin' : None,
72 'pyin' : None,
73 'pyout': None,
73 'pyout': None,
74 'pyerr': None,
74 'pyerr': None,
75 'stdout': '',
75 'stdout': '',
76 'stderr': '',
76 'stderr': '',
77 }
77 }
78
78
79 def init_record(msg):
79 def init_record(msg):
80 """Initialize a TaskRecord based on a request."""
80 """Initialize a TaskRecord based on a request."""
81 header = msg['header']
81 header = msg['header']
82 return {
82 return {
83 'msg_id' : header['msg_id'],
83 'msg_id' : header['msg_id'],
84 'header' : header,
84 'header' : header,
85 'content': msg['content'],
85 'content': msg['content'],
86 'buffers': msg['buffers'],
86 'buffers': msg['buffers'],
87 'submitted': header['date'],
87 'submitted': header['date'],
88 'client_uuid' : None,
88 'client_uuid' : None,
89 'engine_uuid' : None,
89 'engine_uuid' : None,
90 'started': None,
90 'started': None,
91 'completed': None,
91 'completed': None,
92 'resubmitted': None,
92 'resubmitted': None,
93 'received': None,
93 'received': None,
94 'result_header' : None,
94 'result_header' : None,
95 'result_content' : None,
95 'result_content' : None,
96 'result_buffers' : None,
96 'result_buffers' : None,
97 'queue' : None,
97 'queue' : None,
98 'pyin' : None,
98 'pyin' : None,
99 'pyout': None,
99 'pyout': None,
100 'pyerr': None,
100 'pyerr': None,
101 'stdout': '',
101 'stdout': '',
102 'stderr': '',
102 'stderr': '',
103 }
103 }
104
104
105
105
106 class EngineConnector(HasTraits):
106 class EngineConnector(HasTraits):
107 """A simple object for accessing the various zmq connections of an object.
107 """A simple object for accessing the various zmq connections of an object.
108 Attributes are:
108 Attributes are:
109 id (int): engine ID
109 id (int): engine ID
110 uuid (str): uuid (unused?)
110 uuid (str): uuid (unused?)
111 queue (str): identity of queue's XREQ socket
111 queue (str): identity of queue's XREQ socket
112 registration (str): identity of registration XREQ socket
112 registration (str): identity of registration XREQ socket
113 heartbeat (str): identity of heartbeat XREQ socket
113 heartbeat (str): identity of heartbeat XREQ socket
114 """
114 """
115 id=Integer(0)
115 id=Integer(0)
116 queue=CBytes()
116 queue=CBytes()
117 control=CBytes()
117 control=CBytes()
118 registration=CBytes()
118 registration=CBytes()
119 heartbeat=CBytes()
119 heartbeat=CBytes()
120 pending=Set()
120 pending=Set()
121
121
122 class HubFactory(RegistrationFactory):
122 class HubFactory(RegistrationFactory):
123 """The Configurable for setting up a Hub."""
123 """The Configurable for setting up a Hub."""
124
124
125 # port-pairs for monitoredqueues:
125 # port-pairs for monitoredqueues:
126 hb = Tuple(Integer,Integer,config=True,
126 hb = Tuple(Integer,Integer,config=True,
127 help="""XREQ/SUB Port pair for Engine heartbeats""")
127 help="""XREQ/SUB Port pair for Engine heartbeats""")
128 def _hb_default(self):
128 def _hb_default(self):
129 return tuple(util.select_random_ports(2))
129 return tuple(util.select_random_ports(2))
130
130
131 mux = Tuple(Integer,Integer,config=True,
131 mux = Tuple(Integer,Integer,config=True,
132 help="""Engine/Client Port pair for MUX queue""")
132 help="""Engine/Client Port pair for MUX queue""")
133
133
134 def _mux_default(self):
134 def _mux_default(self):
135 return tuple(util.select_random_ports(2))
135 return tuple(util.select_random_ports(2))
136
136
137 task = Tuple(Integer,Integer,config=True,
137 task = Tuple(Integer,Integer,config=True,
138 help="""Engine/Client Port pair for Task queue""")
138 help="""Engine/Client Port pair for Task queue""")
139 def _task_default(self):
139 def _task_default(self):
140 return tuple(util.select_random_ports(2))
140 return tuple(util.select_random_ports(2))
141
141
142 control = Tuple(Integer,Integer,config=True,
142 control = Tuple(Integer,Integer,config=True,
143 help="""Engine/Client Port pair for Control queue""")
143 help="""Engine/Client Port pair for Control queue""")
144
144
145 def _control_default(self):
145 def _control_default(self):
146 return tuple(util.select_random_ports(2))
146 return tuple(util.select_random_ports(2))
147
147
148 iopub = Tuple(Integer,Integer,config=True,
148 iopub = Tuple(Integer,Integer,config=True,
149 help="""Engine/Client Port pair for IOPub relay""")
149 help="""Engine/Client Port pair for IOPub relay""")
150
150
151 def _iopub_default(self):
151 def _iopub_default(self):
152 return tuple(util.select_random_ports(2))
152 return tuple(util.select_random_ports(2))
153
153
154 # single ports:
154 # single ports:
155 mon_port = Integer(config=True,
155 mon_port = Integer(config=True,
156 help="""Monitor (SUB) port for queue traffic""")
156 help="""Monitor (SUB) port for queue traffic""")
157
157
158 def _mon_port_default(self):
158 def _mon_port_default(self):
159 return util.select_random_ports(1)[0]
159 return util.select_random_ports(1)[0]
160
160
161 notifier_port = Integer(config=True,
161 notifier_port = Integer(config=True,
162 help="""PUB port for sending engine status notifications""")
162 help="""PUB port for sending engine status notifications""")
163
163
164 def _notifier_port_default(self):
164 def _notifier_port_default(self):
165 return util.select_random_ports(1)[0]
165 return util.select_random_ports(1)[0]
166
166
167 engine_ip = Unicode('127.0.0.1', config=True,
167 engine_ip = Unicode('127.0.0.1', config=True,
168 help="IP on which to listen for engine connections. [default: loopback]")
168 help="IP on which to listen for engine connections. [default: loopback]")
169 engine_transport = Unicode('tcp', config=True,
169 engine_transport = Unicode('tcp', config=True,
170 help="0MQ transport for engine connections. [default: tcp]")
170 help="0MQ transport for engine connections. [default: tcp]")
171
171
172 client_ip = Unicode('127.0.0.1', config=True,
172 client_ip = Unicode('127.0.0.1', config=True,
173 help="IP on which to listen for client connections. [default: loopback]")
173 help="IP on which to listen for client connections. [default: loopback]")
174 client_transport = Unicode('tcp', config=True,
174 client_transport = Unicode('tcp', config=True,
175 help="0MQ transport for client connections. [default : tcp]")
175 help="0MQ transport for client connections. [default : tcp]")
176
176
177 monitor_ip = Unicode('127.0.0.1', config=True,
177 monitor_ip = Unicode('127.0.0.1', config=True,
178 help="IP on which to listen for monitor messages. [default: loopback]")
178 help="IP on which to listen for monitor messages. [default: loopback]")
179 monitor_transport = Unicode('tcp', config=True,
179 monitor_transport = Unicode('tcp', config=True,
180 help="0MQ transport for monitor messages. [default : tcp]")
180 help="0MQ transport for monitor messages. [default : tcp]")
181
181
182 monitor_url = Unicode('')
182 monitor_url = Unicode('')
183
183
184 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
184 db_class = DottedObjectName('IPython.parallel.controller.dictdb.DictDB',
185 config=True, help="""The class to use for the DB backend""")
185 config=True, help="""The class to use for the DB backend""")
186
186
187 # not configurable
187 # not configurable
188 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
188 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
189 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
189 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
190
190
191 def _ip_changed(self, name, old, new):
191 def _ip_changed(self, name, old, new):
192 self.engine_ip = new
192 self.engine_ip = new
193 self.client_ip = new
193 self.client_ip = new
194 self.monitor_ip = new
194 self.monitor_ip = new
195 self._update_monitor_url()
195 self._update_monitor_url()
196
196
197 def _update_monitor_url(self):
197 def _update_monitor_url(self):
198 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
198 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
199
199
200 def _transport_changed(self, name, old, new):
200 def _transport_changed(self, name, old, new):
201 self.engine_transport = new
201 self.engine_transport = new
202 self.client_transport = new
202 self.client_transport = new
203 self.monitor_transport = new
203 self.monitor_transport = new
204 self._update_monitor_url()
204 self._update_monitor_url()
205
205
206 def __init__(self, **kwargs):
206 def __init__(self, **kwargs):
207 super(HubFactory, self).__init__(**kwargs)
207 super(HubFactory, self).__init__(**kwargs)
208 self._update_monitor_url()
208 self._update_monitor_url()
209
209
210
210
211 def construct(self):
211 def construct(self):
212 self.init_hub()
212 self.init_hub()
213
213
214 def start(self):
214 def start(self):
215 self.heartmonitor.start()
215 self.heartmonitor.start()
216 self.log.info("Heartmonitor started")
216 self.log.info("Heartmonitor started")
217
217
218 def init_hub(self):
218 def init_hub(self):
219 """construct"""
219 """construct"""
220 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
220 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
221 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
221 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
222
222
223 ctx = self.context
223 ctx = self.context
224 loop = self.loop
224 loop = self.loop
225
225
226 # Registrar socket
226 # Registrar socket
227 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
227 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
228 q.bind(client_iface % self.regport)
228 q.bind(client_iface % self.regport)
229 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
229 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
230 if self.client_ip != self.engine_ip:
230 if self.client_ip != self.engine_ip:
231 q.bind(engine_iface % self.regport)
231 q.bind(engine_iface % self.regport)
232 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
232 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
233
233
234 ### Engine connections ###
234 ### Engine connections ###
235
235
236 # heartbeat
236 # heartbeat
237 hpub = ctx.socket(zmq.PUB)
237 hpub = ctx.socket(zmq.PUB)
238 hpub.bind(engine_iface % self.hb[0])
238 hpub.bind(engine_iface % self.hb[0])
239 hrep = ctx.socket(zmq.ROUTER)
239 hrep = ctx.socket(zmq.ROUTER)
240 hrep.bind(engine_iface % self.hb[1])
240 hrep.bind(engine_iface % self.hb[1])
241 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
241 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
242 pingstream=ZMQStream(hpub,loop),
242 pingstream=ZMQStream(hpub,loop),
243 pongstream=ZMQStream(hrep,loop)
243 pongstream=ZMQStream(hrep,loop)
244 )
244 )
245
245
246 ### Client connections ###
246 ### Client connections ###
247 # Notifier socket
247 # Notifier socket
248 n = ZMQStream(ctx.socket(zmq.PUB), loop)
248 n = ZMQStream(ctx.socket(zmq.PUB), loop)
249 n.bind(client_iface%self.notifier_port)
249 n.bind(client_iface%self.notifier_port)
250
250
251 ### build and launch the queues ###
251 ### build and launch the queues ###
252
252
253 # monitor socket
253 # monitor socket
254 sub = ctx.socket(zmq.SUB)
254 sub = ctx.socket(zmq.SUB)
255 sub.setsockopt(zmq.SUBSCRIBE, b"")
255 sub.setsockopt(zmq.SUBSCRIBE, b"")
256 sub.bind(self.monitor_url)
256 sub.bind(self.monitor_url)
257 sub.bind('inproc://monitor')
257 sub.bind('inproc://monitor')
258 sub = ZMQStream(sub, loop)
258 sub = ZMQStream(sub, loop)
259
259
260 # connect the db
260 # connect the db
261 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
261 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
262 # cdir = self.config.Global.cluster_dir
262 # cdir = self.config.Global.cluster_dir
263 self.db = import_item(str(self.db_class))(session=self.session.session,
263 self.db = import_item(str(self.db_class))(session=self.session.session,
264 config=self.config, log=self.log)
264 config=self.config, log=self.log)
265 time.sleep(.25)
265 time.sleep(.25)
266 try:
266 try:
267 scheme = self.config.TaskScheduler.scheme_name
267 scheme = self.config.TaskScheduler.scheme_name
268 except AttributeError:
268 except AttributeError:
269 from .scheduler import TaskScheduler
269 from .scheduler import TaskScheduler
270 scheme = TaskScheduler.scheme_name.get_default_value()
270 scheme = TaskScheduler.scheme_name.get_default_value()
271 # build connection dicts
271 # build connection dicts
272 self.engine_info = {
272 self.engine_info = {
273 'control' : engine_iface%self.control[1],
273 'control' : engine_iface%self.control[1],
274 'mux': engine_iface%self.mux[1],
274 'mux': engine_iface%self.mux[1],
275 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
275 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
276 'task' : engine_iface%self.task[1],
276 'task' : engine_iface%self.task[1],
277 'iopub' : engine_iface%self.iopub[1],
277 'iopub' : engine_iface%self.iopub[1],
278 # 'monitor' : engine_iface%self.mon_port,
278 # 'monitor' : engine_iface%self.mon_port,
279 }
279 }
280
280
281 self.client_info = {
281 self.client_info = {
282 'control' : client_iface%self.control[0],
282 'control' : client_iface%self.control[0],
283 'mux': client_iface%self.mux[0],
283 'mux': client_iface%self.mux[0],
284 'task' : (scheme, client_iface%self.task[0]),
284 'task' : (scheme, client_iface%self.task[0]),
285 'iopub' : client_iface%self.iopub[0],
285 'iopub' : client_iface%self.iopub[0],
286 'notification': client_iface%self.notifier_port
286 'notification': client_iface%self.notifier_port
287 }
287 }
288 self.log.debug("Hub engine addrs: %s", self.engine_info)
288 self.log.debug("Hub engine addrs: %s", self.engine_info)
289 self.log.debug("Hub client addrs: %s", self.client_info)
289 self.log.debug("Hub client addrs: %s", self.client_info)
290
290
291 # resubmit stream
291 # resubmit stream
292 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
292 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
293 url = util.disambiguate_url(self.client_info['task'][-1])
293 url = util.disambiguate_url(self.client_info['task'][-1])
294 r.setsockopt(zmq.IDENTITY, self.session.bsession)
294 r.setsockopt(zmq.IDENTITY, self.session.bsession)
295 r.connect(url)
295 r.connect(url)
296
296
297 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
297 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
298 query=q, notifier=n, resubmit=r, db=self.db,
298 query=q, notifier=n, resubmit=r, db=self.db,
299 engine_info=self.engine_info, client_info=self.client_info,
299 engine_info=self.engine_info, client_info=self.client_info,
300 log=self.log)
300 log=self.log)
301
301
302
302
303 class Hub(SessionFactory):
303 class Hub(SessionFactory):
304 """The IPython Controller Hub with 0MQ connections
304 """The IPython Controller Hub with 0MQ connections
305
305
306 Parameters
306 Parameters
307 ==========
307 ==========
308 loop: zmq IOLoop instance
308 loop: zmq IOLoop instance
309 session: Session object
309 session: Session object
310 <removed> context: zmq context for creating new connections (?)
310 <removed> context: zmq context for creating new connections (?)
311 queue: ZMQStream for monitoring the command queue (SUB)
311 queue: ZMQStream for monitoring the command queue (SUB)
312 query: ZMQStream for engine registration and client queries requests (XREP)
312 query: ZMQStream for engine registration and client queries requests (XREP)
313 heartbeat: HeartMonitor object checking the pulse of the engines
313 heartbeat: HeartMonitor object checking the pulse of the engines
314 notifier: ZMQStream for broadcasting engine registration changes (PUB)
314 notifier: ZMQStream for broadcasting engine registration changes (PUB)
315 db: connection to db for out of memory logging of commands
315 db: connection to db for out of memory logging of commands
316 NotImplemented
316 NotImplemented
317 engine_info: dict of zmq connection information for engines to connect
317 engine_info: dict of zmq connection information for engines to connect
318 to the queues.
318 to the queues.
319 client_info: dict of zmq connection information for engines to connect
319 client_info: dict of zmq connection information for engines to connect
320 to the queues.
320 to the queues.
321 """
321 """
322 # internal data structures:
322 # internal data structures:
323 ids=Set() # engine IDs
323 ids=Set() # engine IDs
324 keytable=Dict()
324 keytable=Dict()
325 by_ident=Dict()
325 by_ident=Dict()
326 engines=Dict()
326 engines=Dict()
327 clients=Dict()
327 clients=Dict()
328 hearts=Dict()
328 hearts=Dict()
329 pending=Set()
329 pending=Set()
330 queues=Dict() # pending msg_ids keyed by engine_id
330 queues=Dict() # pending msg_ids keyed by engine_id
331 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
331 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
332 completed=Dict() # completed msg_ids keyed by engine_id
332 completed=Dict() # completed msg_ids keyed by engine_id
333 all_completed=Set() # completed msg_ids keyed by engine_id
333 all_completed=Set() # completed msg_ids keyed by engine_id
334 dead_engines=Set() # completed msg_ids keyed by engine_id
334 dead_engines=Set() # completed msg_ids keyed by engine_id
335 unassigned=Set() # set of task msg_ds not yet assigned a destination
335 unassigned=Set() # set of task msg_ds not yet assigned a destination
336 incoming_registrations=Dict()
336 incoming_registrations=Dict()
337 registration_timeout=Integer()
337 registration_timeout=Integer()
338 _idcounter=Integer(0)
338 _idcounter=Integer(0)
339
339
340 # objects from constructor:
340 # objects from constructor:
341 query=Instance(ZMQStream)
341 query=Instance(ZMQStream)
342 monitor=Instance(ZMQStream)
342 monitor=Instance(ZMQStream)
343 notifier=Instance(ZMQStream)
343 notifier=Instance(ZMQStream)
344 resubmit=Instance(ZMQStream)
344 resubmit=Instance(ZMQStream)
345 heartmonitor=Instance(HeartMonitor)
345 heartmonitor=Instance(HeartMonitor)
346 db=Instance(object)
346 db=Instance(object)
347 client_info=Dict()
347 client_info=Dict()
348 engine_info=Dict()
348 engine_info=Dict()
349
349
350
350
351 def __init__(self, **kwargs):
351 def __init__(self, **kwargs):
352 """
352 """
353 # universal:
353 # universal:
354 loop: IOLoop for creating future connections
354 loop: IOLoop for creating future connections
355 session: streamsession for sending serialized data
355 session: streamsession for sending serialized data
356 # engine:
356 # engine:
357 queue: ZMQStream for monitoring queue messages
357 queue: ZMQStream for monitoring queue messages
358 query: ZMQStream for engine+client registration and client requests
358 query: ZMQStream for engine+client registration and client requests
359 heartbeat: HeartMonitor object for tracking engines
359 heartbeat: HeartMonitor object for tracking engines
360 # extra:
360 # extra:
361 db: ZMQStream for db connection (NotImplemented)
361 db: ZMQStream for db connection (NotImplemented)
362 engine_info: zmq address/protocol dict for engine connections
362 engine_info: zmq address/protocol dict for engine connections
363 client_info: zmq address/protocol dict for client connections
363 client_info: zmq address/protocol dict for client connections
364 """
364 """
365
365
366 super(Hub, self).__init__(**kwargs)
366 super(Hub, self).__init__(**kwargs)
367 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
367 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
368
368
369 # validate connection dicts:
369 # validate connection dicts:
370 for k,v in self.client_info.iteritems():
370 for k,v in self.client_info.iteritems():
371 if k == 'task':
371 if k == 'task':
372 util.validate_url_container(v[1])
372 util.validate_url_container(v[1])
373 else:
373 else:
374 util.validate_url_container(v)
374 util.validate_url_container(v)
375 # util.validate_url_container(self.client_info)
375 # util.validate_url_container(self.client_info)
376 util.validate_url_container(self.engine_info)
376 util.validate_url_container(self.engine_info)
377
377
378 # register our callbacks
378 # register our callbacks
379 self.query.on_recv(self.dispatch_query)
379 self.query.on_recv(self.dispatch_query)
380 self.monitor.on_recv(self.dispatch_monitor_traffic)
380 self.monitor.on_recv(self.dispatch_monitor_traffic)
381
381
382 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
382 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
383 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
383 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
384
384
385 self.monitor_handlers = {b'in' : self.save_queue_request,
385 self.monitor_handlers = {b'in' : self.save_queue_request,
386 b'out': self.save_queue_result,
386 b'out': self.save_queue_result,
387 b'intask': self.save_task_request,
387 b'intask': self.save_task_request,
388 b'outtask': self.save_task_result,
388 b'outtask': self.save_task_result,
389 b'tracktask': self.save_task_destination,
389 b'tracktask': self.save_task_destination,
390 b'incontrol': _passer,
390 b'incontrol': _passer,
391 b'outcontrol': _passer,
391 b'outcontrol': _passer,
392 b'iopub': self.save_iopub_message,
392 b'iopub': self.save_iopub_message,
393 }
393 }
394
394
395 self.query_handlers = {'queue_request': self.queue_status,
395 self.query_handlers = {'queue_request': self.queue_status,
396 'result_request': self.get_results,
396 'result_request': self.get_results,
397 'history_request': self.get_history,
397 'history_request': self.get_history,
398 'db_request': self.db_query,
398 'db_request': self.db_query,
399 'purge_request': self.purge_results,
399 'purge_request': self.purge_results,
400 'load_request': self.check_load,
400 'load_request': self.check_load,
401 'resubmit_request': self.resubmit_task,
401 'resubmit_request': self.resubmit_task,
402 'shutdown_request': self.shutdown_request,
402 'shutdown_request': self.shutdown_request,
403 'registration_request' : self.register_engine,
403 'registration_request' : self.register_engine,
404 'unregistration_request' : self.unregister_engine,
404 'unregistration_request' : self.unregister_engine,
405 'connection_request': self.connection_request,
405 'connection_request': self.connection_request,
406 }
406 }
407
407
408 # ignore resubmit replies
408 # ignore resubmit replies
409 self.resubmit.on_recv(lambda msg: None, copy=False)
409 self.resubmit.on_recv(lambda msg: None, copy=False)
410
410
411 self.log.info("hub::created hub")
411 self.log.info("hub::created hub")
412
412
413 @property
413 @property
414 def _next_id(self):
414 def _next_id(self):
415 """gemerate a new ID.
415 """gemerate a new ID.
416
416
417 No longer reuse old ids, just count from 0."""
417 No longer reuse old ids, just count from 0."""
418 newid = self._idcounter
418 newid = self._idcounter
419 self._idcounter += 1
419 self._idcounter += 1
420 return newid
420 return newid
421 # newid = 0
421 # newid = 0
422 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
422 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
423 # # print newid, self.ids, self.incoming_registrations
423 # # print newid, self.ids, self.incoming_registrations
424 # while newid in self.ids or newid in incoming:
424 # while newid in self.ids or newid in incoming:
425 # newid += 1
425 # newid += 1
426 # return newid
426 # return newid
427
427
428 #-----------------------------------------------------------------------------
428 #-----------------------------------------------------------------------------
429 # message validation
429 # message validation
430 #-----------------------------------------------------------------------------
430 #-----------------------------------------------------------------------------
431
431
432 def _validate_targets(self, targets):
432 def _validate_targets(self, targets):
433 """turn any valid targets argument into a list of integer ids"""
433 """turn any valid targets argument into a list of integer ids"""
434 if targets is None:
434 if targets is None:
435 # default to all
435 # default to all
436 return self.ids
436 return self.ids
437
437
438 if isinstance(targets, (int,str,unicode)):
438 if isinstance(targets, (int,str,unicode)):
439 # only one target specified
439 # only one target specified
440 targets = [targets]
440 targets = [targets]
441 _targets = []
441 _targets = []
442 for t in targets:
442 for t in targets:
443 # map raw identities to ids
443 # map raw identities to ids
444 if isinstance(t, (str,unicode)):
444 if isinstance(t, (str,unicode)):
445 t = self.by_ident.get(cast_bytes(t), t)
445 t = self.by_ident.get(cast_bytes(t), t)
446 _targets.append(t)
446 _targets.append(t)
447 targets = _targets
447 targets = _targets
448 bad_targets = [ t for t in targets if t not in self.ids ]
448 bad_targets = [ t for t in targets if t not in self.ids ]
449 if bad_targets:
449 if bad_targets:
450 raise IndexError("No Such Engine: %r" % bad_targets)
450 raise IndexError("No Such Engine: %r" % bad_targets)
451 if not targets:
451 if not targets:
452 raise IndexError("No Engines Registered")
452 raise IndexError("No Engines Registered")
453 return targets
453 return targets
454
454
455 #-----------------------------------------------------------------------------
455 #-----------------------------------------------------------------------------
456 # dispatch methods (1 per stream)
456 # dispatch methods (1 per stream)
457 #-----------------------------------------------------------------------------
457 #-----------------------------------------------------------------------------
458
458
459
459
460 @util.log_errors
460 @util.log_errors
461 def dispatch_monitor_traffic(self, msg):
461 def dispatch_monitor_traffic(self, msg):
462 """all ME and Task queue messages come through here, as well as
462 """all ME and Task queue messages come through here, as well as
463 IOPub traffic."""
463 IOPub traffic."""
464 self.log.debug("monitor traffic: %r", msg[0])
464 self.log.debug("monitor traffic: %r", msg[0])
465 switch = msg[0]
465 switch = msg[0]
466 try:
466 try:
467 idents, msg = self.session.feed_identities(msg[1:])
467 idents, msg = self.session.feed_identities(msg[1:])
468 except ValueError:
468 except ValueError:
469 idents=[]
469 idents=[]
470 if not idents:
470 if not idents:
471 self.log.error("Monitor message without topic: %r", msg)
471 self.log.error("Monitor message without topic: %r", msg)
472 return
472 return
473 handler = self.monitor_handlers.get(switch, None)
473 handler = self.monitor_handlers.get(switch, None)
474 if handler is not None:
474 if handler is not None:
475 handler(idents, msg)
475 handler(idents, msg)
476 else:
476 else:
477 self.log.error("Unrecognized monitor topic: %r", switch)
477 self.log.error("Unrecognized monitor topic: %r", switch)
478
478
479
479
480 @util.log_errors
480 @util.log_errors
481 def dispatch_query(self, msg):
481 def dispatch_query(self, msg):
482 """Route registration requests and queries from clients."""
482 """Route registration requests and queries from clients."""
483 try:
483 try:
484 idents, msg = self.session.feed_identities(msg)
484 idents, msg = self.session.feed_identities(msg)
485 except ValueError:
485 except ValueError:
486 idents = []
486 idents = []
487 if not idents:
487 if not idents:
488 self.log.error("Bad Query Message: %r", msg)
488 self.log.error("Bad Query Message: %r", msg)
489 return
489 return
490 client_id = idents[0]
490 client_id = idents[0]
491 try:
491 try:
492 msg = self.session.unserialize(msg, content=True)
492 msg = self.session.unserialize(msg, content=True)
493 except Exception:
493 except Exception:
494 content = error.wrap_exception()
494 content = error.wrap_exception()
495 self.log.error("Bad Query Message: %r", msg, exc_info=True)
495 self.log.error("Bad Query Message: %r", msg, exc_info=True)
496 self.session.send(self.query, "hub_error", ident=client_id,
496 self.session.send(self.query, "hub_error", ident=client_id,
497 content=content)
497 content=content)
498 return
498 return
499 # print client_id, header, parent, content
499 # print client_id, header, parent, content
500 #switch on message type:
500 #switch on message type:
501 msg_type = msg['header']['msg_type']
501 msg_type = msg['header']['msg_type']
502 self.log.info("client::client %r requested %r", client_id, msg_type)
502 self.log.info("client::client %r requested %r", client_id, msg_type)
503 handler = self.query_handlers.get(msg_type, None)
503 handler = self.query_handlers.get(msg_type, None)
504 try:
504 try:
505 assert handler is not None, "Bad Message Type: %r" % msg_type
505 assert handler is not None, "Bad Message Type: %r" % msg_type
506 except:
506 except:
507 content = error.wrap_exception()
507 content = error.wrap_exception()
508 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
508 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
509 self.session.send(self.query, "hub_error", ident=client_id,
509 self.session.send(self.query, "hub_error", ident=client_id,
510 content=content)
510 content=content)
511 return
511 return
512
512
513 else:
513 else:
514 handler(idents, msg)
514 handler(idents, msg)
515
515
516 def dispatch_db(self, msg):
516 def dispatch_db(self, msg):
517 """"""
517 """"""
518 raise NotImplementedError
518 raise NotImplementedError
519
519
520 #---------------------------------------------------------------------------
520 #---------------------------------------------------------------------------
521 # handler methods (1 per event)
521 # handler methods (1 per event)
522 #---------------------------------------------------------------------------
522 #---------------------------------------------------------------------------
523
523
524 #----------------------- Heartbeat --------------------------------------
524 #----------------------- Heartbeat --------------------------------------
525
525
526 def handle_new_heart(self, heart):
526 def handle_new_heart(self, heart):
527 """handler to attach to heartbeater.
527 """handler to attach to heartbeater.
528 Called when a new heart starts to beat.
528 Called when a new heart starts to beat.
529 Triggers completion of registration."""
529 Triggers completion of registration."""
530 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
530 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
531 if heart not in self.incoming_registrations:
531 if heart not in self.incoming_registrations:
532 self.log.info("heartbeat::ignoring new heart: %r", heart)
532 self.log.info("heartbeat::ignoring new heart: %r", heart)
533 else:
533 else:
534 self.finish_registration(heart)
534 self.finish_registration(heart)
535
535
536
536
537 def handle_heart_failure(self, heart):
537 def handle_heart_failure(self, heart):
538 """handler to attach to heartbeater.
538 """handler to attach to heartbeater.
539 called when a previously registered heart fails to respond to beat request.
539 called when a previously registered heart fails to respond to beat request.
540 triggers unregistration"""
540 triggers unregistration"""
541 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
541 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
542 eid = self.hearts.get(heart, None)
542 eid = self.hearts.get(heart, None)
543 queue = self.engines[eid].queue
543 queue = self.engines[eid].queue
544 if eid is None or self.keytable[eid] in self.dead_engines:
544 if eid is None or self.keytable[eid] in self.dead_engines:
545 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
545 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
546 else:
546 else:
547 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
547 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
548
548
549 #----------------------- MUX Queue Traffic ------------------------------
549 #----------------------- MUX Queue Traffic ------------------------------
550
550
551 def save_queue_request(self, idents, msg):
551 def save_queue_request(self, idents, msg):
552 if len(idents) < 2:
552 if len(idents) < 2:
553 self.log.error("invalid identity prefix: %r", idents)
553 self.log.error("invalid identity prefix: %r", idents)
554 return
554 return
555 queue_id, client_id = idents[:2]
555 queue_id, client_id = idents[:2]
556 try:
556 try:
557 msg = self.session.unserialize(msg)
557 msg = self.session.unserialize(msg)
558 except Exception:
558 except Exception:
559 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
559 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
560 return
560 return
561
561
562 eid = self.by_ident.get(queue_id, None)
562 eid = self.by_ident.get(queue_id, None)
563 if eid is None:
563 if eid is None:
564 self.log.error("queue::target %r not registered", queue_id)
564 self.log.error("queue::target %r not registered", queue_id)
565 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
565 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
566 return
566 return
567 record = init_record(msg)
567 record = init_record(msg)
568 msg_id = record['msg_id']
568 msg_id = record['msg_id']
569 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
569 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
570 # Unicode in records
570 # Unicode in records
571 record['engine_uuid'] = queue_id.decode('ascii')
571 record['engine_uuid'] = queue_id.decode('ascii')
572 record['client_uuid'] = client_id.decode('ascii')
572 record['client_uuid'] = client_id.decode('ascii')
573 record['queue'] = 'mux'
573 record['queue'] = 'mux'
574
574
575 try:
575 try:
576 # it's posible iopub arrived first:
576 # it's posible iopub arrived first:
577 existing = self.db.get_record(msg_id)
577 existing = self.db.get_record(msg_id)
578 for key,evalue in existing.iteritems():
578 for key,evalue in existing.iteritems():
579 rvalue = record.get(key, None)
579 rvalue = record.get(key, None)
580 if evalue and rvalue and evalue != rvalue:
580 if evalue and rvalue and evalue != rvalue:
581 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
581 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
582 elif evalue and not rvalue:
582 elif evalue and not rvalue:
583 record[key] = evalue
583 record[key] = evalue
584 try:
584 try:
585 self.db.update_record(msg_id, record)
585 self.db.update_record(msg_id, record)
586 except Exception:
586 except Exception:
587 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
587 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
588 except KeyError:
588 except KeyError:
589 try:
589 try:
590 self.db.add_record(msg_id, record)
590 self.db.add_record(msg_id, record)
591 except Exception:
591 except Exception:
592 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
592 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
593
593
594
594
595 self.pending.add(msg_id)
595 self.pending.add(msg_id)
596 self.queues[eid].append(msg_id)
596 self.queues[eid].append(msg_id)
597
597
598 def save_queue_result(self, idents, msg):
598 def save_queue_result(self, idents, msg):
599 if len(idents) < 2:
599 if len(idents) < 2:
600 self.log.error("invalid identity prefix: %r", idents)
600 self.log.error("invalid identity prefix: %r", idents)
601 return
601 return
602
602
603 client_id, queue_id = idents[:2]
603 client_id, queue_id = idents[:2]
604 try:
604 try:
605 msg = self.session.unserialize(msg)
605 msg = self.session.unserialize(msg)
606 except Exception:
606 except Exception:
607 self.log.error("queue::engine %r sent invalid message to %r: %r",
607 self.log.error("queue::engine %r sent invalid message to %r: %r",
608 queue_id, client_id, msg, exc_info=True)
608 queue_id, client_id, msg, exc_info=True)
609 return
609 return
610
610
611 eid = self.by_ident.get(queue_id, None)
611 eid = self.by_ident.get(queue_id, None)
612 if eid is None:
612 if eid is None:
613 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
613 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
614 return
614 return
615
615
616 parent = msg['parent_header']
616 parent = msg['parent_header']
617 if not parent:
617 if not parent:
618 return
618 return
619 msg_id = parent['msg_id']
619 msg_id = parent['msg_id']
620 if msg_id in self.pending:
620 if msg_id in self.pending:
621 self.pending.remove(msg_id)
621 self.pending.remove(msg_id)
622 self.all_completed.add(msg_id)
622 self.all_completed.add(msg_id)
623 self.queues[eid].remove(msg_id)
623 self.queues[eid].remove(msg_id)
624 self.completed[eid].append(msg_id)
624 self.completed[eid].append(msg_id)
625 self.log.info("queue::request %r completed on %s", msg_id, eid)
625 self.log.info("queue::request %r completed on %s", msg_id, eid)
626 elif msg_id not in self.all_completed:
626 elif msg_id not in self.all_completed:
627 # it could be a result from a dead engine that died before delivering the
627 # it could be a result from a dead engine that died before delivering the
628 # result
628 # result
629 self.log.warn("queue:: unknown msg finished %r", msg_id)
629 self.log.warn("queue:: unknown msg finished %r", msg_id)
630 return
630 return
631 # update record anyway, because the unregistration could have been premature
631 # update record anyway, because the unregistration could have been premature
632 rheader = msg['header']
632 rheader = msg['header']
633 completed = rheader['date']
633 completed = rheader['date']
634 started = rheader.get('started', None)
634 started = rheader.get('started', None)
635 result = {
635 result = {
636 'result_header' : rheader,
636 'result_header' : rheader,
637 'result_content': msg['content'],
637 'result_content': msg['content'],
638 'received': datetime.now(),
638 'received': datetime.now(),
639 'started' : started,
639 'started' : started,
640 'completed' : completed
640 'completed' : completed
641 }
641 }
642
642
643 result['result_buffers'] = msg['buffers']
643 result['result_buffers'] = msg['buffers']
644 try:
644 try:
645 self.db.update_record(msg_id, result)
645 self.db.update_record(msg_id, result)
646 except Exception:
646 except Exception:
647 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
647 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
648
648
649
649
650 #--------------------- Task Queue Traffic ------------------------------
650 #--------------------- Task Queue Traffic ------------------------------
651
651
652 def save_task_request(self, idents, msg):
652 def save_task_request(self, idents, msg):
653 """Save the submission of a task."""
653 """Save the submission of a task."""
654 client_id = idents[0]
654 client_id = idents[0]
655
655
656 try:
656 try:
657 msg = self.session.unserialize(msg)
657 msg = self.session.unserialize(msg)
658 except Exception:
658 except Exception:
659 self.log.error("task::client %r sent invalid task message: %r",
659 self.log.error("task::client %r sent invalid task message: %r",
660 client_id, msg, exc_info=True)
660 client_id, msg, exc_info=True)
661 return
661 return
662 record = init_record(msg)
662 record = init_record(msg)
663
663
664 record['client_uuid'] = client_id.decode('ascii')
664 record['client_uuid'] = client_id.decode('ascii')
665 record['queue'] = 'task'
665 record['queue'] = 'task'
666 header = msg['header']
666 header = msg['header']
667 msg_id = header['msg_id']
667 msg_id = header['msg_id']
668 self.pending.add(msg_id)
668 self.pending.add(msg_id)
669 self.unassigned.add(msg_id)
669 self.unassigned.add(msg_id)
670 try:
670 try:
671 # it's posible iopub arrived first:
671 # it's posible iopub arrived first:
672 existing = self.db.get_record(msg_id)
672 existing = self.db.get_record(msg_id)
673 if existing['resubmitted']:
673 if existing['resubmitted']:
674 for key in ('submitted', 'client_uuid', 'buffers'):
674 for key in ('submitted', 'client_uuid', 'buffers'):
675 # don't clobber these keys on resubmit
675 # don't clobber these keys on resubmit
676 # submitted and client_uuid should be different
676 # submitted and client_uuid should be different
677 # and buffers might be big, and shouldn't have changed
677 # and buffers might be big, and shouldn't have changed
678 record.pop(key)
678 record.pop(key)
679 # still check content,header which should not change
679 # still check content,header which should not change
680 # but are not expensive to compare as buffers
680 # but are not expensive to compare as buffers
681
681
682 for key,evalue in existing.iteritems():
682 for key,evalue in existing.iteritems():
683 if key.endswith('buffers'):
683 if key.endswith('buffers'):
684 # don't compare buffers
684 # don't compare buffers
685 continue
685 continue
686 rvalue = record.get(key, None)
686 rvalue = record.get(key, None)
687 if evalue and rvalue and evalue != rvalue:
687 if evalue and rvalue and evalue != rvalue:
688 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
688 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
689 elif evalue and not rvalue:
689 elif evalue and not rvalue:
690 record[key] = evalue
690 record[key] = evalue
691 try:
691 try:
692 self.db.update_record(msg_id, record)
692 self.db.update_record(msg_id, record)
693 except Exception:
693 except Exception:
694 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
694 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
695 except KeyError:
695 except KeyError:
696 try:
696 try:
697 self.db.add_record(msg_id, record)
697 self.db.add_record(msg_id, record)
698 except Exception:
698 except Exception:
699 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
699 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
700 except Exception:
700 except Exception:
701 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
701 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
702
702
703 def save_task_result(self, idents, msg):
703 def save_task_result(self, idents, msg):
704 """save the result of a completed task."""
704 """save the result of a completed task."""
705 client_id = idents[0]
705 client_id = idents[0]
706 try:
706 try:
707 msg = self.session.unserialize(msg)
707 msg = self.session.unserialize(msg)
708 except Exception:
708 except Exception:
709 self.log.error("task::invalid task result message send to %r: %r",
709 self.log.error("task::invalid task result message send to %r: %r",
710 client_id, msg, exc_info=True)
710 client_id, msg, exc_info=True)
711 return
711 return
712
712
713 parent = msg['parent_header']
713 parent = msg['parent_header']
714 if not parent:
714 if not parent:
715 # print msg
715 # print msg
716 self.log.warn("Task %r had no parent!", msg)
716 self.log.warn("Task %r had no parent!", msg)
717 return
717 return
718 msg_id = parent['msg_id']
718 msg_id = parent['msg_id']
719 if msg_id in self.unassigned:
719 if msg_id in self.unassigned:
720 self.unassigned.remove(msg_id)
720 self.unassigned.remove(msg_id)
721
721
722 header = msg['header']
722 header = msg['header']
723 engine_uuid = header.get('engine', u'')
723 engine_uuid = header.get('engine', u'')
724 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
724 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
725
725
726 status = header.get('status', None)
726 status = header.get('status', None)
727
727
728 if msg_id in self.pending:
728 if msg_id in self.pending:
729 self.log.info("task::task %r finished on %s", msg_id, eid)
729 self.log.info("task::task %r finished on %s", msg_id, eid)
730 self.pending.remove(msg_id)
730 self.pending.remove(msg_id)
731 self.all_completed.add(msg_id)
731 self.all_completed.add(msg_id)
732 if eid is not None:
732 if eid is not None:
733 if status != 'aborted':
733 if status != 'aborted':
734 self.completed[eid].append(msg_id)
734 self.completed[eid].append(msg_id)
735 if msg_id in self.tasks[eid]:
735 if msg_id in self.tasks[eid]:
736 self.tasks[eid].remove(msg_id)
736 self.tasks[eid].remove(msg_id)
737 completed = header['date']
737 completed = header['date']
738 started = header.get('started', None)
738 started = header.get('started', None)
739 result = {
739 result = {
740 'result_header' : header,
740 'result_header' : header,
741 'result_content': msg['content'],
741 'result_content': msg['content'],
742 'started' : started,
742 'started' : started,
743 'completed' : completed,
743 'completed' : completed,
744 'received' : datetime.now(),
744 'received' : datetime.now(),
745 'engine_uuid': engine_uuid,
745 'engine_uuid': engine_uuid,
746 }
746 }
747
747
748 result['result_buffers'] = msg['buffers']
748 result['result_buffers'] = msg['buffers']
749 try:
749 try:
750 self.db.update_record(msg_id, result)
750 self.db.update_record(msg_id, result)
751 except Exception:
751 except Exception:
752 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
752 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
753
753
754 else:
754 else:
755 self.log.debug("task::unknown task %r finished", msg_id)
755 self.log.debug("task::unknown task %r finished", msg_id)
756
756
757 def save_task_destination(self, idents, msg):
757 def save_task_destination(self, idents, msg):
758 try:
758 try:
759 msg = self.session.unserialize(msg, content=True)
759 msg = self.session.unserialize(msg, content=True)
760 except Exception:
760 except Exception:
761 self.log.error("task::invalid task tracking message", exc_info=True)
761 self.log.error("task::invalid task tracking message", exc_info=True)
762 return
762 return
763 content = msg['content']
763 content = msg['content']
764 # print (content)
764 # print (content)
765 msg_id = content['msg_id']
765 msg_id = content['msg_id']
766 engine_uuid = content['engine_id']
766 engine_uuid = content['engine_id']
767 eid = self.by_ident[cast_bytes(engine_uuid)]
767 eid = self.by_ident[cast_bytes(engine_uuid)]
768
768
769 self.log.info("task::task %r arrived on %r", msg_id, eid)
769 self.log.info("task::task %r arrived on %r", msg_id, eid)
770 if msg_id in self.unassigned:
770 if msg_id in self.unassigned:
771 self.unassigned.remove(msg_id)
771 self.unassigned.remove(msg_id)
772 # else:
772 # else:
773 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
773 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
774
774
775 self.tasks[eid].append(msg_id)
775 self.tasks[eid].append(msg_id)
776 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
776 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
777 try:
777 try:
778 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
778 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
779 except Exception:
779 except Exception:
780 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
780 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
781
781
782
782
783 def mia_task_request(self, idents, msg):
783 def mia_task_request(self, idents, msg):
784 raise NotImplementedError
784 raise NotImplementedError
785 client_id = idents[0]
785 client_id = idents[0]
786 # content = dict(mia=self.mia,status='ok')
786 # content = dict(mia=self.mia,status='ok')
787 # self.session.send('mia_reply', content=content, idents=client_id)
787 # self.session.send('mia_reply', content=content, idents=client_id)
788
788
789
789
790 #--------------------- IOPub Traffic ------------------------------
790 #--------------------- IOPub Traffic ------------------------------
791
791
792 def save_iopub_message(self, topics, msg):
792 def save_iopub_message(self, topics, msg):
793 """save an iopub message into the db"""
793 """save an iopub message into the db"""
794 # print (topics)
794 # print (topics)
795 try:
795 try:
796 msg = self.session.unserialize(msg, content=True)
796 msg = self.session.unserialize(msg, content=True)
797 except Exception:
797 except Exception:
798 self.log.error("iopub::invalid IOPub message", exc_info=True)
798 self.log.error("iopub::invalid IOPub message", exc_info=True)
799 return
799 return
800
800
801 parent = msg['parent_header']
801 parent = msg['parent_header']
802 if not parent:
802 if not parent:
803 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
803 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
804 return
804 return
805 msg_id = parent['msg_id']
805 msg_id = parent['msg_id']
806 msg_type = msg['header']['msg_type']
806 msg_type = msg['header']['msg_type']
807 content = msg['content']
807 content = msg['content']
808
808
809 # ensure msg_id is in db
809 # ensure msg_id is in db
810 try:
810 try:
811 rec = self.db.get_record(msg_id)
811 rec = self.db.get_record(msg_id)
812 except KeyError:
812 except KeyError:
813 rec = empty_record()
813 rec = empty_record()
814 rec['msg_id'] = msg_id
814 rec['msg_id'] = msg_id
815 self.db.add_record(msg_id, rec)
815 self.db.add_record(msg_id, rec)
816 # stream
816 # stream
817 d = {}
817 d = {}
818 if msg_type == 'stream':
818 if msg_type == 'stream':
819 name = content['name']
819 name = content['name']
820 s = rec[name] or ''
820 s = rec[name] or ''
821 d[name] = s + content['data']
821 d[name] = s + content['data']
822
822
823 elif msg_type == 'pyerr':
823 elif msg_type == 'pyerr':
824 d['pyerr'] = content
824 d['pyerr'] = content
825 elif msg_type == 'pyin':
825 elif msg_type == 'pyin':
826 d['pyin'] = content['code']
826 d['pyin'] = content['code']
827 elif msg_type in ('display_data', 'pyout'):
828 d[msg_type] = content
829 elif msg_type == 'status':
830 pass
827 else:
831 else:
828 d[msg_type] = content.get('data', '')
832 self.log.warn("unhandled iopub msg_type: %r", msg_type)
833
834 if not d:
835 return
829
836
830 try:
837 try:
831 self.db.update_record(msg_id, d)
838 self.db.update_record(msg_id, d)
832 except Exception:
839 except Exception:
833 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
840 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
834
841
835
842
836
843
837 #-------------------------------------------------------------------------
844 #-------------------------------------------------------------------------
838 # Registration requests
845 # Registration requests
839 #-------------------------------------------------------------------------
846 #-------------------------------------------------------------------------
840
847
841 def connection_request(self, client_id, msg):
848 def connection_request(self, client_id, msg):
842 """Reply with connection addresses for clients."""
849 """Reply with connection addresses for clients."""
843 self.log.info("client::client %r connected", client_id)
850 self.log.info("client::client %r connected", client_id)
844 content = dict(status='ok')
851 content = dict(status='ok')
845 content.update(self.client_info)
852 content.update(self.client_info)
846 jsonable = {}
853 jsonable = {}
847 for k,v in self.keytable.iteritems():
854 for k,v in self.keytable.iteritems():
848 if v not in self.dead_engines:
855 if v not in self.dead_engines:
849 jsonable[str(k)] = v.decode('ascii')
856 jsonable[str(k)] = v.decode('ascii')
850 content['engines'] = jsonable
857 content['engines'] = jsonable
851 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
858 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
852
859
853 def register_engine(self, reg, msg):
860 def register_engine(self, reg, msg):
854 """Register a new engine."""
861 """Register a new engine."""
855 content = msg['content']
862 content = msg['content']
856 try:
863 try:
857 queue = cast_bytes(content['queue'])
864 queue = cast_bytes(content['queue'])
858 except KeyError:
865 except KeyError:
859 self.log.error("registration::queue not specified", exc_info=True)
866 self.log.error("registration::queue not specified", exc_info=True)
860 return
867 return
861 heart = content.get('heartbeat', None)
868 heart = content.get('heartbeat', None)
862 if heart:
869 if heart:
863 heart = cast_bytes(heart)
870 heart = cast_bytes(heart)
864 """register a new engine, and create the socket(s) necessary"""
871 """register a new engine, and create the socket(s) necessary"""
865 eid = self._next_id
872 eid = self._next_id
866 # print (eid, queue, reg, heart)
873 # print (eid, queue, reg, heart)
867
874
868 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
875 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
869
876
870 content = dict(id=eid,status='ok')
877 content = dict(id=eid,status='ok')
871 content.update(self.engine_info)
878 content.update(self.engine_info)
872 # check if requesting available IDs:
879 # check if requesting available IDs:
873 if queue in self.by_ident:
880 if queue in self.by_ident:
874 try:
881 try:
875 raise KeyError("queue_id %r in use" % queue)
882 raise KeyError("queue_id %r in use" % queue)
876 except:
883 except:
877 content = error.wrap_exception()
884 content = error.wrap_exception()
878 self.log.error("queue_id %r in use", queue, exc_info=True)
885 self.log.error("queue_id %r in use", queue, exc_info=True)
879 elif heart in self.hearts: # need to check unique hearts?
886 elif heart in self.hearts: # need to check unique hearts?
880 try:
887 try:
881 raise KeyError("heart_id %r in use" % heart)
888 raise KeyError("heart_id %r in use" % heart)
882 except:
889 except:
883 self.log.error("heart_id %r in use", heart, exc_info=True)
890 self.log.error("heart_id %r in use", heart, exc_info=True)
884 content = error.wrap_exception()
891 content = error.wrap_exception()
885 else:
892 else:
886 for h, pack in self.incoming_registrations.iteritems():
893 for h, pack in self.incoming_registrations.iteritems():
887 if heart == h:
894 if heart == h:
888 try:
895 try:
889 raise KeyError("heart_id %r in use" % heart)
896 raise KeyError("heart_id %r in use" % heart)
890 except:
897 except:
891 self.log.error("heart_id %r in use", heart, exc_info=True)
898 self.log.error("heart_id %r in use", heart, exc_info=True)
892 content = error.wrap_exception()
899 content = error.wrap_exception()
893 break
900 break
894 elif queue == pack[1]:
901 elif queue == pack[1]:
895 try:
902 try:
896 raise KeyError("queue_id %r in use" % queue)
903 raise KeyError("queue_id %r in use" % queue)
897 except:
904 except:
898 self.log.error("queue_id %r in use", queue, exc_info=True)
905 self.log.error("queue_id %r in use", queue, exc_info=True)
899 content = error.wrap_exception()
906 content = error.wrap_exception()
900 break
907 break
901
908
902 msg = self.session.send(self.query, "registration_reply",
909 msg = self.session.send(self.query, "registration_reply",
903 content=content,
910 content=content,
904 ident=reg)
911 ident=reg)
905
912
906 if content['status'] == 'ok':
913 if content['status'] == 'ok':
907 if heart in self.heartmonitor.hearts:
914 if heart in self.heartmonitor.hearts:
908 # already beating
915 # already beating
909 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
916 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
910 self.finish_registration(heart)
917 self.finish_registration(heart)
911 else:
918 else:
912 purge = lambda : self._purge_stalled_registration(heart)
919 purge = lambda : self._purge_stalled_registration(heart)
913 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
920 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
914 dc.start()
921 dc.start()
915 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
922 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
916 else:
923 else:
917 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
924 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
918 return eid
925 return eid
919
926
920 def unregister_engine(self, ident, msg):
927 def unregister_engine(self, ident, msg):
921 """Unregister an engine that explicitly requested to leave."""
928 """Unregister an engine that explicitly requested to leave."""
922 try:
929 try:
923 eid = msg['content']['id']
930 eid = msg['content']['id']
924 except:
931 except:
925 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
932 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
926 return
933 return
927 self.log.info("registration::unregister_engine(%r)", eid)
934 self.log.info("registration::unregister_engine(%r)", eid)
928 # print (eid)
935 # print (eid)
929 uuid = self.keytable[eid]
936 uuid = self.keytable[eid]
930 content=dict(id=eid, queue=uuid.decode('ascii'))
937 content=dict(id=eid, queue=uuid.decode('ascii'))
931 self.dead_engines.add(uuid)
938 self.dead_engines.add(uuid)
932 # self.ids.remove(eid)
939 # self.ids.remove(eid)
933 # uuid = self.keytable.pop(eid)
940 # uuid = self.keytable.pop(eid)
934 #
941 #
935 # ec = self.engines.pop(eid)
942 # ec = self.engines.pop(eid)
936 # self.hearts.pop(ec.heartbeat)
943 # self.hearts.pop(ec.heartbeat)
937 # self.by_ident.pop(ec.queue)
944 # self.by_ident.pop(ec.queue)
938 # self.completed.pop(eid)
945 # self.completed.pop(eid)
939 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
946 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
940 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
947 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
941 dc.start()
948 dc.start()
942 ############## TODO: HANDLE IT ################
949 ############## TODO: HANDLE IT ################
943
950
944 if self.notifier:
951 if self.notifier:
945 self.session.send(self.notifier, "unregistration_notification", content=content)
952 self.session.send(self.notifier, "unregistration_notification", content=content)
946
953
947 def _handle_stranded_msgs(self, eid, uuid):
954 def _handle_stranded_msgs(self, eid, uuid):
948 """Handle messages known to be on an engine when the engine unregisters.
955 """Handle messages known to be on an engine when the engine unregisters.
949
956
950 It is possible that this will fire prematurely - that is, an engine will
957 It is possible that this will fire prematurely - that is, an engine will
951 go down after completing a result, and the client will be notified
958 go down after completing a result, and the client will be notified
952 that the result failed and later receive the actual result.
959 that the result failed and later receive the actual result.
953 """
960 """
954
961
955 outstanding = self.queues[eid]
962 outstanding = self.queues[eid]
956
963
957 for msg_id in outstanding:
964 for msg_id in outstanding:
958 self.pending.remove(msg_id)
965 self.pending.remove(msg_id)
959 self.all_completed.add(msg_id)
966 self.all_completed.add(msg_id)
960 try:
967 try:
961 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
968 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
962 except:
969 except:
963 content = error.wrap_exception()
970 content = error.wrap_exception()
964 # build a fake header:
971 # build a fake header:
965 header = {}
972 header = {}
966 header['engine'] = uuid
973 header['engine'] = uuid
967 header['date'] = datetime.now()
974 header['date'] = datetime.now()
968 rec = dict(result_content=content, result_header=header, result_buffers=[])
975 rec = dict(result_content=content, result_header=header, result_buffers=[])
969 rec['completed'] = header['date']
976 rec['completed'] = header['date']
970 rec['engine_uuid'] = uuid
977 rec['engine_uuid'] = uuid
971 try:
978 try:
972 self.db.update_record(msg_id, rec)
979 self.db.update_record(msg_id, rec)
973 except Exception:
980 except Exception:
974 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
981 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
975
982
976
983
977 def finish_registration(self, heart):
984 def finish_registration(self, heart):
978 """Second half of engine registration, called after our HeartMonitor
985 """Second half of engine registration, called after our HeartMonitor
979 has received a beat from the Engine's Heart."""
986 has received a beat from the Engine's Heart."""
980 try:
987 try:
981 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
988 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
982 except KeyError:
989 except KeyError:
983 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
990 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
984 return
991 return
985 self.log.info("registration::finished registering engine %i:%r", eid, queue)
992 self.log.info("registration::finished registering engine %i:%r", eid, queue)
986 if purge is not None:
993 if purge is not None:
987 purge.stop()
994 purge.stop()
988 control = queue
995 control = queue
989 self.ids.add(eid)
996 self.ids.add(eid)
990 self.keytable[eid] = queue
997 self.keytable[eid] = queue
991 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
998 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
992 control=control, heartbeat=heart)
999 control=control, heartbeat=heart)
993 self.by_ident[queue] = eid
1000 self.by_ident[queue] = eid
994 self.queues[eid] = list()
1001 self.queues[eid] = list()
995 self.tasks[eid] = list()
1002 self.tasks[eid] = list()
996 self.completed[eid] = list()
1003 self.completed[eid] = list()
997 self.hearts[heart] = eid
1004 self.hearts[heart] = eid
998 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
1005 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
999 if self.notifier:
1006 if self.notifier:
1000 self.session.send(self.notifier, "registration_notification", content=content)
1007 self.session.send(self.notifier, "registration_notification", content=content)
1001 self.log.info("engine::Engine Connected: %i", eid)
1008 self.log.info("engine::Engine Connected: %i", eid)
1002
1009
1003 def _purge_stalled_registration(self, heart):
1010 def _purge_stalled_registration(self, heart):
1004 if heart in self.incoming_registrations:
1011 if heart in self.incoming_registrations:
1005 eid = self.incoming_registrations.pop(heart)[0]
1012 eid = self.incoming_registrations.pop(heart)[0]
1006 self.log.info("registration::purging stalled registration: %i", eid)
1013 self.log.info("registration::purging stalled registration: %i", eid)
1007 else:
1014 else:
1008 pass
1015 pass
1009
1016
1010 #-------------------------------------------------------------------------
1017 #-------------------------------------------------------------------------
1011 # Client Requests
1018 # Client Requests
1012 #-------------------------------------------------------------------------
1019 #-------------------------------------------------------------------------
1013
1020
1014 def shutdown_request(self, client_id, msg):
1021 def shutdown_request(self, client_id, msg):
1015 """handle shutdown request."""
1022 """handle shutdown request."""
1016 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1023 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1017 # also notify other clients of shutdown
1024 # also notify other clients of shutdown
1018 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1025 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1019 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1026 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1020 dc.start()
1027 dc.start()
1021
1028
1022 def _shutdown(self):
1029 def _shutdown(self):
1023 self.log.info("hub::hub shutting down.")
1030 self.log.info("hub::hub shutting down.")
1024 time.sleep(0.1)
1031 time.sleep(0.1)
1025 sys.exit(0)
1032 sys.exit(0)
1026
1033
1027
1034
1028 def check_load(self, client_id, msg):
1035 def check_load(self, client_id, msg):
1029 content = msg['content']
1036 content = msg['content']
1030 try:
1037 try:
1031 targets = content['targets']
1038 targets = content['targets']
1032 targets = self._validate_targets(targets)
1039 targets = self._validate_targets(targets)
1033 except:
1040 except:
1034 content = error.wrap_exception()
1041 content = error.wrap_exception()
1035 self.session.send(self.query, "hub_error",
1042 self.session.send(self.query, "hub_error",
1036 content=content, ident=client_id)
1043 content=content, ident=client_id)
1037 return
1044 return
1038
1045
1039 content = dict(status='ok')
1046 content = dict(status='ok')
1040 # loads = {}
1047 # loads = {}
1041 for t in targets:
1048 for t in targets:
1042 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1049 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1043 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1050 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1044
1051
1045
1052
1046 def queue_status(self, client_id, msg):
1053 def queue_status(self, client_id, msg):
1047 """Return the Queue status of one or more targets.
1054 """Return the Queue status of one or more targets.
1048 if verbose: return the msg_ids
1055 if verbose: return the msg_ids
1049 else: return len of each type.
1056 else: return len of each type.
1050 keys: queue (pending MUX jobs)
1057 keys: queue (pending MUX jobs)
1051 tasks (pending Task jobs)
1058 tasks (pending Task jobs)
1052 completed (finished jobs from both queues)"""
1059 completed (finished jobs from both queues)"""
1053 content = msg['content']
1060 content = msg['content']
1054 targets = content['targets']
1061 targets = content['targets']
1055 try:
1062 try:
1056 targets = self._validate_targets(targets)
1063 targets = self._validate_targets(targets)
1057 except:
1064 except:
1058 content = error.wrap_exception()
1065 content = error.wrap_exception()
1059 self.session.send(self.query, "hub_error",
1066 self.session.send(self.query, "hub_error",
1060 content=content, ident=client_id)
1067 content=content, ident=client_id)
1061 return
1068 return
1062 verbose = content.get('verbose', False)
1069 verbose = content.get('verbose', False)
1063 content = dict(status='ok')
1070 content = dict(status='ok')
1064 for t in targets:
1071 for t in targets:
1065 queue = self.queues[t]
1072 queue = self.queues[t]
1066 completed = self.completed[t]
1073 completed = self.completed[t]
1067 tasks = self.tasks[t]
1074 tasks = self.tasks[t]
1068 if not verbose:
1075 if not verbose:
1069 queue = len(queue)
1076 queue = len(queue)
1070 completed = len(completed)
1077 completed = len(completed)
1071 tasks = len(tasks)
1078 tasks = len(tasks)
1072 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1079 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1073 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1080 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1074 # print (content)
1081 # print (content)
1075 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1082 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1076
1083
1077 def purge_results(self, client_id, msg):
1084 def purge_results(self, client_id, msg):
1078 """Purge results from memory. This method is more valuable before we move
1085 """Purge results from memory. This method is more valuable before we move
1079 to a DB based message storage mechanism."""
1086 to a DB based message storage mechanism."""
1080 content = msg['content']
1087 content = msg['content']
1081 self.log.info("Dropping records with %s", content)
1088 self.log.info("Dropping records with %s", content)
1082 msg_ids = content.get('msg_ids', [])
1089 msg_ids = content.get('msg_ids', [])
1083 reply = dict(status='ok')
1090 reply = dict(status='ok')
1084 if msg_ids == 'all':
1091 if msg_ids == 'all':
1085 try:
1092 try:
1086 self.db.drop_matching_records(dict(completed={'$ne':None}))
1093 self.db.drop_matching_records(dict(completed={'$ne':None}))
1087 except Exception:
1094 except Exception:
1088 reply = error.wrap_exception()
1095 reply = error.wrap_exception()
1089 else:
1096 else:
1090 pending = filter(lambda m: m in self.pending, msg_ids)
1097 pending = filter(lambda m: m in self.pending, msg_ids)
1091 if pending:
1098 if pending:
1092 try:
1099 try:
1093 raise IndexError("msg pending: %r" % pending[0])
1100 raise IndexError("msg pending: %r" % pending[0])
1094 except:
1101 except:
1095 reply = error.wrap_exception()
1102 reply = error.wrap_exception()
1096 else:
1103 else:
1097 try:
1104 try:
1098 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1105 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1099 except Exception:
1106 except Exception:
1100 reply = error.wrap_exception()
1107 reply = error.wrap_exception()
1101
1108
1102 if reply['status'] == 'ok':
1109 if reply['status'] == 'ok':
1103 eids = content.get('engine_ids', [])
1110 eids = content.get('engine_ids', [])
1104 for eid in eids:
1111 for eid in eids:
1105 if eid not in self.engines:
1112 if eid not in self.engines:
1106 try:
1113 try:
1107 raise IndexError("No such engine: %i" % eid)
1114 raise IndexError("No such engine: %i" % eid)
1108 except:
1115 except:
1109 reply = error.wrap_exception()
1116 reply = error.wrap_exception()
1110 break
1117 break
1111 uid = self.engines[eid].queue
1118 uid = self.engines[eid].queue
1112 try:
1119 try:
1113 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1120 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1114 except Exception:
1121 except Exception:
1115 reply = error.wrap_exception()
1122 reply = error.wrap_exception()
1116 break
1123 break
1117
1124
1118 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1125 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1119
1126
1120 def resubmit_task(self, client_id, msg):
1127 def resubmit_task(self, client_id, msg):
1121 """Resubmit one or more tasks."""
1128 """Resubmit one or more tasks."""
1122 def finish(reply):
1129 def finish(reply):
1123 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1130 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1124
1131
1125 content = msg['content']
1132 content = msg['content']
1126 msg_ids = content['msg_ids']
1133 msg_ids = content['msg_ids']
1127 reply = dict(status='ok')
1134 reply = dict(status='ok')
1128 try:
1135 try:
1129 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1136 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1130 'header', 'content', 'buffers'])
1137 'header', 'content', 'buffers'])
1131 except Exception:
1138 except Exception:
1132 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1139 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1133 return finish(error.wrap_exception())
1140 return finish(error.wrap_exception())
1134
1141
1135 # validate msg_ids
1142 # validate msg_ids
1136 found_ids = [ rec['msg_id'] for rec in records ]
1143 found_ids = [ rec['msg_id'] for rec in records ]
1137 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1144 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1138 if len(records) > len(msg_ids):
1145 if len(records) > len(msg_ids):
1139 try:
1146 try:
1140 raise RuntimeError("DB appears to be in an inconsistent state."
1147 raise RuntimeError("DB appears to be in an inconsistent state."
1141 "More matching records were found than should exist")
1148 "More matching records were found than should exist")
1142 except Exception:
1149 except Exception:
1143 return finish(error.wrap_exception())
1150 return finish(error.wrap_exception())
1144 elif len(records) < len(msg_ids):
1151 elif len(records) < len(msg_ids):
1145 missing = [ m for m in msg_ids if m not in found_ids ]
1152 missing = [ m for m in msg_ids if m not in found_ids ]
1146 try:
1153 try:
1147 raise KeyError("No such msg(s): %r" % missing)
1154 raise KeyError("No such msg(s): %r" % missing)
1148 except KeyError:
1155 except KeyError:
1149 return finish(error.wrap_exception())
1156 return finish(error.wrap_exception())
1150 elif pending_ids:
1157 elif pending_ids:
1151 pass
1158 pass
1152 # no need to raise on resubmit of pending task, now that we
1159 # no need to raise on resubmit of pending task, now that we
1153 # resubmit under new ID, but do we want to raise anyway?
1160 # resubmit under new ID, but do we want to raise anyway?
1154 # msg_id = invalid_ids[0]
1161 # msg_id = invalid_ids[0]
1155 # try:
1162 # try:
1156 # raise ValueError("Task(s) %r appears to be inflight" % )
1163 # raise ValueError("Task(s) %r appears to be inflight" % )
1157 # except Exception:
1164 # except Exception:
1158 # return finish(error.wrap_exception())
1165 # return finish(error.wrap_exception())
1159
1166
1160 # mapping of original IDs to resubmitted IDs
1167 # mapping of original IDs to resubmitted IDs
1161 resubmitted = {}
1168 resubmitted = {}
1162
1169
1163 # send the messages
1170 # send the messages
1164 for rec in records:
1171 for rec in records:
1165 header = rec['header']
1172 header = rec['header']
1166 msg = self.session.msg(header['msg_type'], parent=header)
1173 msg = self.session.msg(header['msg_type'], parent=header)
1167 msg_id = msg['msg_id']
1174 msg_id = msg['msg_id']
1168 msg['content'] = rec['content']
1175 msg['content'] = rec['content']
1169
1176
1170 # use the old header, but update msg_id and timestamp
1177 # use the old header, but update msg_id and timestamp
1171 fresh = msg['header']
1178 fresh = msg['header']
1172 header['msg_id'] = fresh['msg_id']
1179 header['msg_id'] = fresh['msg_id']
1173 header['date'] = fresh['date']
1180 header['date'] = fresh['date']
1174 msg['header'] = header
1181 msg['header'] = header
1175
1182
1176 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1183 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1177
1184
1178 resubmitted[rec['msg_id']] = msg_id
1185 resubmitted[rec['msg_id']] = msg_id
1179 self.pending.add(msg_id)
1186 self.pending.add(msg_id)
1180 msg['buffers'] = rec['buffers']
1187 msg['buffers'] = rec['buffers']
1181 try:
1188 try:
1182 self.db.add_record(msg_id, init_record(msg))
1189 self.db.add_record(msg_id, init_record(msg))
1183 except Exception:
1190 except Exception:
1184 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1191 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1185
1192
1186 finish(dict(status='ok', resubmitted=resubmitted))
1193 finish(dict(status='ok', resubmitted=resubmitted))
1187
1194
1188 # store the new IDs in the Task DB
1195 # store the new IDs in the Task DB
1189 for msg_id, resubmit_id in resubmitted.iteritems():
1196 for msg_id, resubmit_id in resubmitted.iteritems():
1190 try:
1197 try:
1191 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1198 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1192 except Exception:
1199 except Exception:
1193 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1200 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1194
1201
1195
1202
1196 def _extract_record(self, rec):
1203 def _extract_record(self, rec):
1197 """decompose a TaskRecord dict into subsection of reply for get_result"""
1204 """decompose a TaskRecord dict into subsection of reply for get_result"""
1198 io_dict = {}
1205 io_dict = {}
1199 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1206 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1200 io_dict[key] = rec[key]
1207 io_dict[key] = rec[key]
1201 content = { 'result_content': rec['result_content'],
1208 content = { 'result_content': rec['result_content'],
1202 'header': rec['header'],
1209 'header': rec['header'],
1203 'result_header' : rec['result_header'],
1210 'result_header' : rec['result_header'],
1204 'received' : rec['received'],
1211 'received' : rec['received'],
1205 'io' : io_dict,
1212 'io' : io_dict,
1206 }
1213 }
1207 if rec['result_buffers']:
1214 if rec['result_buffers']:
1208 buffers = map(bytes, rec['result_buffers'])
1215 buffers = map(bytes, rec['result_buffers'])
1209 else:
1216 else:
1210 buffers = []
1217 buffers = []
1211
1218
1212 return content, buffers
1219 return content, buffers
1213
1220
1214 def get_results(self, client_id, msg):
1221 def get_results(self, client_id, msg):
1215 """Get the result of 1 or more messages."""
1222 """Get the result of 1 or more messages."""
1216 content = msg['content']
1223 content = msg['content']
1217 msg_ids = sorted(set(content['msg_ids']))
1224 msg_ids = sorted(set(content['msg_ids']))
1218 statusonly = content.get('status_only', False)
1225 statusonly = content.get('status_only', False)
1219 pending = []
1226 pending = []
1220 completed = []
1227 completed = []
1221 content = dict(status='ok')
1228 content = dict(status='ok')
1222 content['pending'] = pending
1229 content['pending'] = pending
1223 content['completed'] = completed
1230 content['completed'] = completed
1224 buffers = []
1231 buffers = []
1225 if not statusonly:
1232 if not statusonly:
1226 try:
1233 try:
1227 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1234 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1228 # turn match list into dict, for faster lookup
1235 # turn match list into dict, for faster lookup
1229 records = {}
1236 records = {}
1230 for rec in matches:
1237 for rec in matches:
1231 records[rec['msg_id']] = rec
1238 records[rec['msg_id']] = rec
1232 except Exception:
1239 except Exception:
1233 content = error.wrap_exception()
1240 content = error.wrap_exception()
1234 self.session.send(self.query, "result_reply", content=content,
1241 self.session.send(self.query, "result_reply", content=content,
1235 parent=msg, ident=client_id)
1242 parent=msg, ident=client_id)
1236 return
1243 return
1237 else:
1244 else:
1238 records = {}
1245 records = {}
1239 for msg_id in msg_ids:
1246 for msg_id in msg_ids:
1240 if msg_id in self.pending:
1247 if msg_id in self.pending:
1241 pending.append(msg_id)
1248 pending.append(msg_id)
1242 elif msg_id in self.all_completed:
1249 elif msg_id in self.all_completed:
1243 completed.append(msg_id)
1250 completed.append(msg_id)
1244 if not statusonly:
1251 if not statusonly:
1245 c,bufs = self._extract_record(records[msg_id])
1252 c,bufs = self._extract_record(records[msg_id])
1246 content[msg_id] = c
1253 content[msg_id] = c
1247 buffers.extend(bufs)
1254 buffers.extend(bufs)
1248 elif msg_id in records:
1255 elif msg_id in records:
1249 if rec['completed']:
1256 if rec['completed']:
1250 completed.append(msg_id)
1257 completed.append(msg_id)
1251 c,bufs = self._extract_record(records[msg_id])
1258 c,bufs = self._extract_record(records[msg_id])
1252 content[msg_id] = c
1259 content[msg_id] = c
1253 buffers.extend(bufs)
1260 buffers.extend(bufs)
1254 else:
1261 else:
1255 pending.append(msg_id)
1262 pending.append(msg_id)
1256 else:
1263 else:
1257 try:
1264 try:
1258 raise KeyError('No such message: '+msg_id)
1265 raise KeyError('No such message: '+msg_id)
1259 except:
1266 except:
1260 content = error.wrap_exception()
1267 content = error.wrap_exception()
1261 break
1268 break
1262 self.session.send(self.query, "result_reply", content=content,
1269 self.session.send(self.query, "result_reply", content=content,
1263 parent=msg, ident=client_id,
1270 parent=msg, ident=client_id,
1264 buffers=buffers)
1271 buffers=buffers)
1265
1272
1266 def get_history(self, client_id, msg):
1273 def get_history(self, client_id, msg):
1267 """Get a list of all msg_ids in our DB records"""
1274 """Get a list of all msg_ids in our DB records"""
1268 try:
1275 try:
1269 msg_ids = self.db.get_history()
1276 msg_ids = self.db.get_history()
1270 except Exception as e:
1277 except Exception as e:
1271 content = error.wrap_exception()
1278 content = error.wrap_exception()
1272 else:
1279 else:
1273 content = dict(status='ok', history=msg_ids)
1280 content = dict(status='ok', history=msg_ids)
1274
1281
1275 self.session.send(self.query, "history_reply", content=content,
1282 self.session.send(self.query, "history_reply", content=content,
1276 parent=msg, ident=client_id)
1283 parent=msg, ident=client_id)
1277
1284
1278 def db_query(self, client_id, msg):
1285 def db_query(self, client_id, msg):
1279 """Perform a raw query on the task record database."""
1286 """Perform a raw query on the task record database."""
1280 content = msg['content']
1287 content = msg['content']
1281 query = content.get('query', {})
1288 query = content.get('query', {})
1282 keys = content.get('keys', None)
1289 keys = content.get('keys', None)
1283 buffers = []
1290 buffers = []
1284 empty = list()
1291 empty = list()
1285 try:
1292 try:
1286 records = self.db.find_records(query, keys)
1293 records = self.db.find_records(query, keys)
1287 except Exception as e:
1294 except Exception as e:
1288 content = error.wrap_exception()
1295 content = error.wrap_exception()
1289 else:
1296 else:
1290 # extract buffers from reply content:
1297 # extract buffers from reply content:
1291 if keys is not None:
1298 if keys is not None:
1292 buffer_lens = [] if 'buffers' in keys else None
1299 buffer_lens = [] if 'buffers' in keys else None
1293 result_buffer_lens = [] if 'result_buffers' in keys else None
1300 result_buffer_lens = [] if 'result_buffers' in keys else None
1294 else:
1301 else:
1295 buffer_lens = None
1302 buffer_lens = None
1296 result_buffer_lens = None
1303 result_buffer_lens = None
1297
1304
1298 for rec in records:
1305 for rec in records:
1299 # buffers may be None, so double check
1306 # buffers may be None, so double check
1300 b = rec.pop('buffers', empty) or empty
1307 b = rec.pop('buffers', empty) or empty
1301 if buffer_lens is not None:
1308 if buffer_lens is not None:
1302 buffer_lens.append(len(b))
1309 buffer_lens.append(len(b))
1303 buffers.extend(b)
1310 buffers.extend(b)
1304 rb = rec.pop('result_buffers', empty) or empty
1311 rb = rec.pop('result_buffers', empty) or empty
1305 if result_buffer_lens is not None:
1312 if result_buffer_lens is not None:
1306 result_buffer_lens.append(len(rb))
1313 result_buffer_lens.append(len(rb))
1307 buffers.extend(rb)
1314 buffers.extend(rb)
1308 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1315 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1309 result_buffer_lens=result_buffer_lens)
1316 result_buffer_lens=result_buffer_lens)
1310 # self.log.debug (content)
1317 # self.log.debug (content)
1311 self.session.send(self.query, "db_reply", content=content,
1318 self.session.send(self.query, "db_reply", content=content,
1312 parent=msg, ident=client_id,
1319 parent=msg, ident=client_id,
1313 buffers=buffers)
1320 buffers=buffers)
1314
1321
@@ -1,436 +1,455 b''
1 """Tests for parallel client.py
1 """Tests for parallel client.py
2
2
3 Authors:
3 Authors:
4
4
5 * Min RK
5 * Min RK
6 """
6 """
7
7
8 #-------------------------------------------------------------------------------
8 #-------------------------------------------------------------------------------
9 # Copyright (C) 2011 The IPython Development Team
9 # Copyright (C) 2011 The IPython Development Team
10 #
10 #
11 # Distributed under the terms of the BSD License. The full license is in
11 # Distributed under the terms of the BSD License. The full license is in
12 # the file COPYING, distributed as part of this software.
12 # the file COPYING, distributed as part of this software.
13 #-------------------------------------------------------------------------------
13 #-------------------------------------------------------------------------------
14
14
15 #-------------------------------------------------------------------------------
15 #-------------------------------------------------------------------------------
16 # Imports
16 # Imports
17 #-------------------------------------------------------------------------------
17 #-------------------------------------------------------------------------------
18
18
19 from __future__ import division
19 from __future__ import division
20
20
21 import time
21 import time
22 from datetime import datetime
22 from datetime import datetime
23 from tempfile import mktemp
23 from tempfile import mktemp
24
24
25 import zmq
25 import zmq
26
26
27 from IPython import parallel
27 from IPython import parallel
28 from IPython.parallel.client import client as clientmod
28 from IPython.parallel.client import client as clientmod
29 from IPython.parallel import error
29 from IPython.parallel import error
30 from IPython.parallel import AsyncResult, AsyncHubResult
30 from IPython.parallel import AsyncResult, AsyncHubResult
31 from IPython.parallel import LoadBalancedView, DirectView
31 from IPython.parallel import LoadBalancedView, DirectView
32
32
33 from clienttest import ClusterTestCase, segfault, wait, add_engines
33 from clienttest import ClusterTestCase, segfault, wait, add_engines
34
34
35 def setup():
35 def setup():
36 add_engines(4, total=True)
36 add_engines(4, total=True)
37
37
38 class TestClient(ClusterTestCase):
38 class TestClient(ClusterTestCase):
39
39
40 def test_ids(self):
40 def test_ids(self):
41 n = len(self.client.ids)
41 n = len(self.client.ids)
42 self.add_engines(2)
42 self.add_engines(2)
43 self.assertEquals(len(self.client.ids), n+2)
43 self.assertEquals(len(self.client.ids), n+2)
44
44
45 def test_view_indexing(self):
45 def test_view_indexing(self):
46 """test index access for views"""
46 """test index access for views"""
47 self.minimum_engines(4)
47 self.minimum_engines(4)
48 targets = self.client._build_targets('all')[-1]
48 targets = self.client._build_targets('all')[-1]
49 v = self.client[:]
49 v = self.client[:]
50 self.assertEquals(v.targets, targets)
50 self.assertEquals(v.targets, targets)
51 t = self.client.ids[2]
51 t = self.client.ids[2]
52 v = self.client[t]
52 v = self.client[t]
53 self.assert_(isinstance(v, DirectView))
53 self.assert_(isinstance(v, DirectView))
54 self.assertEquals(v.targets, t)
54 self.assertEquals(v.targets, t)
55 t = self.client.ids[2:4]
55 t = self.client.ids[2:4]
56 v = self.client[t]
56 v = self.client[t]
57 self.assert_(isinstance(v, DirectView))
57 self.assert_(isinstance(v, DirectView))
58 self.assertEquals(v.targets, t)
58 self.assertEquals(v.targets, t)
59 v = self.client[::2]
59 v = self.client[::2]
60 self.assert_(isinstance(v, DirectView))
60 self.assert_(isinstance(v, DirectView))
61 self.assertEquals(v.targets, targets[::2])
61 self.assertEquals(v.targets, targets[::2])
62 v = self.client[1::3]
62 v = self.client[1::3]
63 self.assert_(isinstance(v, DirectView))
63 self.assert_(isinstance(v, DirectView))
64 self.assertEquals(v.targets, targets[1::3])
64 self.assertEquals(v.targets, targets[1::3])
65 v = self.client[:-3]
65 v = self.client[:-3]
66 self.assert_(isinstance(v, DirectView))
66 self.assert_(isinstance(v, DirectView))
67 self.assertEquals(v.targets, targets[:-3])
67 self.assertEquals(v.targets, targets[:-3])
68 v = self.client[-1]
68 v = self.client[-1]
69 self.assert_(isinstance(v, DirectView))
69 self.assert_(isinstance(v, DirectView))
70 self.assertEquals(v.targets, targets[-1])
70 self.assertEquals(v.targets, targets[-1])
71 self.assertRaises(TypeError, lambda : self.client[None])
71 self.assertRaises(TypeError, lambda : self.client[None])
72
72
73 def test_lbview_targets(self):
73 def test_lbview_targets(self):
74 """test load_balanced_view targets"""
74 """test load_balanced_view targets"""
75 v = self.client.load_balanced_view()
75 v = self.client.load_balanced_view()
76 self.assertEquals(v.targets, None)
76 self.assertEquals(v.targets, None)
77 v = self.client.load_balanced_view(-1)
77 v = self.client.load_balanced_view(-1)
78 self.assertEquals(v.targets, [self.client.ids[-1]])
78 self.assertEquals(v.targets, [self.client.ids[-1]])
79 v = self.client.load_balanced_view('all')
79 v = self.client.load_balanced_view('all')
80 self.assertEquals(v.targets, None)
80 self.assertEquals(v.targets, None)
81
81
82 def test_dview_targets(self):
82 def test_dview_targets(self):
83 """test direct_view targets"""
83 """test direct_view targets"""
84 v = self.client.direct_view()
84 v = self.client.direct_view()
85 self.assertEquals(v.targets, 'all')
85 self.assertEquals(v.targets, 'all')
86 v = self.client.direct_view('all')
86 v = self.client.direct_view('all')
87 self.assertEquals(v.targets, 'all')
87 self.assertEquals(v.targets, 'all')
88 v = self.client.direct_view(-1)
88 v = self.client.direct_view(-1)
89 self.assertEquals(v.targets, self.client.ids[-1])
89 self.assertEquals(v.targets, self.client.ids[-1])
90
90
91 def test_lazy_all_targets(self):
91 def test_lazy_all_targets(self):
92 """test lazy evaluation of rc.direct_view('all')"""
92 """test lazy evaluation of rc.direct_view('all')"""
93 v = self.client.direct_view()
93 v = self.client.direct_view()
94 self.assertEquals(v.targets, 'all')
94 self.assertEquals(v.targets, 'all')
95
95
96 def double(x):
96 def double(x):
97 return x*2
97 return x*2
98 seq = range(100)
98 seq = range(100)
99 ref = [ double(x) for x in seq ]
99 ref = [ double(x) for x in seq ]
100
100
101 # add some engines, which should be used
101 # add some engines, which should be used
102 self.add_engines(1)
102 self.add_engines(1)
103 n1 = len(self.client.ids)
103 n1 = len(self.client.ids)
104
104
105 # simple apply
105 # simple apply
106 r = v.apply_sync(lambda : 1)
106 r = v.apply_sync(lambda : 1)
107 self.assertEquals(r, [1] * n1)
107 self.assertEquals(r, [1] * n1)
108
108
109 # map goes through remotefunction
109 # map goes through remotefunction
110 r = v.map_sync(double, seq)
110 r = v.map_sync(double, seq)
111 self.assertEquals(r, ref)
111 self.assertEquals(r, ref)
112
112
113 # add a couple more engines, and try again
113 # add a couple more engines, and try again
114 self.add_engines(2)
114 self.add_engines(2)
115 n2 = len(self.client.ids)
115 n2 = len(self.client.ids)
116 self.assertNotEquals(n2, n1)
116 self.assertNotEquals(n2, n1)
117
117
118 # apply
118 # apply
119 r = v.apply_sync(lambda : 1)
119 r = v.apply_sync(lambda : 1)
120 self.assertEquals(r, [1] * n2)
120 self.assertEquals(r, [1] * n2)
121
121
122 # map
122 # map
123 r = v.map_sync(double, seq)
123 r = v.map_sync(double, seq)
124 self.assertEquals(r, ref)
124 self.assertEquals(r, ref)
125
125
126 def test_targets(self):
126 def test_targets(self):
127 """test various valid targets arguments"""
127 """test various valid targets arguments"""
128 build = self.client._build_targets
128 build = self.client._build_targets
129 ids = self.client.ids
129 ids = self.client.ids
130 idents,targets = build(None)
130 idents,targets = build(None)
131 self.assertEquals(ids, targets)
131 self.assertEquals(ids, targets)
132
132
133 def test_clear(self):
133 def test_clear(self):
134 """test clear behavior"""
134 """test clear behavior"""
135 self.minimum_engines(2)
135 self.minimum_engines(2)
136 v = self.client[:]
136 v = self.client[:]
137 v.block=True
137 v.block=True
138 v.push(dict(a=5))
138 v.push(dict(a=5))
139 v.pull('a')
139 v.pull('a')
140 id0 = self.client.ids[-1]
140 id0 = self.client.ids[-1]
141 self.client.clear(targets=id0, block=True)
141 self.client.clear(targets=id0, block=True)
142 a = self.client[:-1].get('a')
142 a = self.client[:-1].get('a')
143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
143 self.assertRaisesRemote(NameError, self.client[id0].get, 'a')
144 self.client.clear(block=True)
144 self.client.clear(block=True)
145 for i in self.client.ids:
145 for i in self.client.ids:
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
146 self.assertRaisesRemote(NameError, self.client[i].get, 'a')
147
147
148 def test_get_result(self):
148 def test_get_result(self):
149 """test getting results from the Hub."""
149 """test getting results from the Hub."""
150 c = clientmod.Client(profile='iptest')
150 c = clientmod.Client(profile='iptest')
151 t = c.ids[-1]
151 t = c.ids[-1]
152 ar = c[t].apply_async(wait, 1)
152 ar = c[t].apply_async(wait, 1)
153 # give the monitor time to notice the message
153 # give the monitor time to notice the message
154 time.sleep(.25)
154 time.sleep(.25)
155 ahr = self.client.get_result(ar.msg_ids)
155 ahr = self.client.get_result(ar.msg_ids)
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
156 self.assertTrue(isinstance(ahr, AsyncHubResult))
157 self.assertEquals(ahr.get(), ar.get())
157 self.assertEquals(ahr.get(), ar.get())
158 ar2 = self.client.get_result(ar.msg_ids)
158 ar2 = self.client.get_result(ar.msg_ids)
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
159 self.assertFalse(isinstance(ar2, AsyncHubResult))
160 c.close()
160 c.close()
161
161
162 def test_get_execute_result(self):
163 """test getting execute results from the Hub."""
164 c = clientmod.Client(profile='iptest')
165 t = c.ids[-1]
166 cell = '\n'.join([
167 'import time',
168 'time.sleep(0.25)',
169 '5'
170 ])
171 ar = c[t].execute("import time; time.sleep(1)", silent=False)
172 # give the monitor time to notice the message
173 time.sleep(.25)
174 ahr = self.client.get_result(ar.msg_ids)
175 self.assertTrue(isinstance(ahr, AsyncHubResult))
176 self.assertEquals(ahr.get().pyout, ar.get().pyout)
177 ar2 = self.client.get_result(ar.msg_ids)
178 self.assertFalse(isinstance(ar2, AsyncHubResult))
179 c.close()
180
162 def test_ids_list(self):
181 def test_ids_list(self):
163 """test client.ids"""
182 """test client.ids"""
164 ids = self.client.ids
183 ids = self.client.ids
165 self.assertEquals(ids, self.client._ids)
184 self.assertEquals(ids, self.client._ids)
166 self.assertFalse(ids is self.client._ids)
185 self.assertFalse(ids is self.client._ids)
167 ids.remove(ids[-1])
186 ids.remove(ids[-1])
168 self.assertNotEquals(ids, self.client._ids)
187 self.assertNotEquals(ids, self.client._ids)
169
188
170 def test_queue_status(self):
189 def test_queue_status(self):
171 ids = self.client.ids
190 ids = self.client.ids
172 id0 = ids[0]
191 id0 = ids[0]
173 qs = self.client.queue_status(targets=id0)
192 qs = self.client.queue_status(targets=id0)
174 self.assertTrue(isinstance(qs, dict))
193 self.assertTrue(isinstance(qs, dict))
175 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
194 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
176 allqs = self.client.queue_status()
195 allqs = self.client.queue_status()
177 self.assertTrue(isinstance(allqs, dict))
196 self.assertTrue(isinstance(allqs, dict))
178 intkeys = list(allqs.keys())
197 intkeys = list(allqs.keys())
179 intkeys.remove('unassigned')
198 intkeys.remove('unassigned')
180 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
199 self.assertEquals(sorted(intkeys), sorted(self.client.ids))
181 unassigned = allqs.pop('unassigned')
200 unassigned = allqs.pop('unassigned')
182 for eid,qs in allqs.items():
201 for eid,qs in allqs.items():
183 self.assertTrue(isinstance(qs, dict))
202 self.assertTrue(isinstance(qs, dict))
184 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
203 self.assertEquals(sorted(qs.keys()), ['completed', 'queue', 'tasks'])
185
204
186 def test_shutdown(self):
205 def test_shutdown(self):
187 ids = self.client.ids
206 ids = self.client.ids
188 id0 = ids[0]
207 id0 = ids[0]
189 self.client.shutdown(id0, block=True)
208 self.client.shutdown(id0, block=True)
190 while id0 in self.client.ids:
209 while id0 in self.client.ids:
191 time.sleep(0.1)
210 time.sleep(0.1)
192 self.client.spin()
211 self.client.spin()
193
212
194 self.assertRaises(IndexError, lambda : self.client[id0])
213 self.assertRaises(IndexError, lambda : self.client[id0])
195
214
196 def test_result_status(self):
215 def test_result_status(self):
197 pass
216 pass
198 # to be written
217 # to be written
199
218
200 def test_db_query_dt(self):
219 def test_db_query_dt(self):
201 """test db query by date"""
220 """test db query by date"""
202 hist = self.client.hub_history()
221 hist = self.client.hub_history()
203 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
222 middle = self.client.db_query({'msg_id' : hist[len(hist)//2]})[0]
204 tic = middle['submitted']
223 tic = middle['submitted']
205 before = self.client.db_query({'submitted' : {'$lt' : tic}})
224 before = self.client.db_query({'submitted' : {'$lt' : tic}})
206 after = self.client.db_query({'submitted' : {'$gte' : tic}})
225 after = self.client.db_query({'submitted' : {'$gte' : tic}})
207 self.assertEquals(len(before)+len(after),len(hist))
226 self.assertEquals(len(before)+len(after),len(hist))
208 for b in before:
227 for b in before:
209 self.assertTrue(b['submitted'] < tic)
228 self.assertTrue(b['submitted'] < tic)
210 for a in after:
229 for a in after:
211 self.assertTrue(a['submitted'] >= tic)
230 self.assertTrue(a['submitted'] >= tic)
212 same = self.client.db_query({'submitted' : tic})
231 same = self.client.db_query({'submitted' : tic})
213 for s in same:
232 for s in same:
214 self.assertTrue(s['submitted'] == tic)
233 self.assertTrue(s['submitted'] == tic)
215
234
216 def test_db_query_keys(self):
235 def test_db_query_keys(self):
217 """test extracting subset of record keys"""
236 """test extracting subset of record keys"""
218 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
237 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
219 for rec in found:
238 for rec in found:
220 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
239 self.assertEquals(set(rec.keys()), set(['msg_id', 'submitted', 'completed']))
221
240
222 def test_db_query_default_keys(self):
241 def test_db_query_default_keys(self):
223 """default db_query excludes buffers"""
242 """default db_query excludes buffers"""
224 found = self.client.db_query({'msg_id': {'$ne' : ''}})
243 found = self.client.db_query({'msg_id': {'$ne' : ''}})
225 for rec in found:
244 for rec in found:
226 keys = set(rec.keys())
245 keys = set(rec.keys())
227 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
246 self.assertFalse('buffers' in keys, "'buffers' should not be in: %s" % keys)
228 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
247 self.assertFalse('result_buffers' in keys, "'result_buffers' should not be in: %s" % keys)
229
248
230 def test_db_query_msg_id(self):
249 def test_db_query_msg_id(self):
231 """ensure msg_id is always in db queries"""
250 """ensure msg_id is always in db queries"""
232 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
251 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted', 'completed'])
233 for rec in found:
252 for rec in found:
234 self.assertTrue('msg_id' in rec.keys())
253 self.assertTrue('msg_id' in rec.keys())
235 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
254 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['submitted'])
236 for rec in found:
255 for rec in found:
237 self.assertTrue('msg_id' in rec.keys())
256 self.assertTrue('msg_id' in rec.keys())
238 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
257 found = self.client.db_query({'msg_id': {'$ne' : ''}},keys=['msg_id'])
239 for rec in found:
258 for rec in found:
240 self.assertTrue('msg_id' in rec.keys())
259 self.assertTrue('msg_id' in rec.keys())
241
260
242 def test_db_query_get_result(self):
261 def test_db_query_get_result(self):
243 """pop in db_query shouldn't pop from result itself"""
262 """pop in db_query shouldn't pop from result itself"""
244 self.client[:].apply_sync(lambda : 1)
263 self.client[:].apply_sync(lambda : 1)
245 found = self.client.db_query({'msg_id': {'$ne' : ''}})
264 found = self.client.db_query({'msg_id': {'$ne' : ''}})
246 rc2 = clientmod.Client(profile='iptest')
265 rc2 = clientmod.Client(profile='iptest')
247 # If this bug is not fixed, this call will hang:
266 # If this bug is not fixed, this call will hang:
248 ar = rc2.get_result(self.client.history[-1])
267 ar = rc2.get_result(self.client.history[-1])
249 ar.wait(2)
268 ar.wait(2)
250 self.assertTrue(ar.ready())
269 self.assertTrue(ar.ready())
251 ar.get()
270 ar.get()
252 rc2.close()
271 rc2.close()
253
272
254 def test_db_query_in(self):
273 def test_db_query_in(self):
255 """test db query with '$in','$nin' operators"""
274 """test db query with '$in','$nin' operators"""
256 hist = self.client.hub_history()
275 hist = self.client.hub_history()
257 even = hist[::2]
276 even = hist[::2]
258 odd = hist[1::2]
277 odd = hist[1::2]
259 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
278 recs = self.client.db_query({ 'msg_id' : {'$in' : even}})
260 found = [ r['msg_id'] for r in recs ]
279 found = [ r['msg_id'] for r in recs ]
261 self.assertEquals(set(even), set(found))
280 self.assertEquals(set(even), set(found))
262 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
281 recs = self.client.db_query({ 'msg_id' : {'$nin' : even}})
263 found = [ r['msg_id'] for r in recs ]
282 found = [ r['msg_id'] for r in recs ]
264 self.assertEquals(set(odd), set(found))
283 self.assertEquals(set(odd), set(found))
265
284
266 def test_hub_history(self):
285 def test_hub_history(self):
267 hist = self.client.hub_history()
286 hist = self.client.hub_history()
268 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
287 recs = self.client.db_query({ 'msg_id' : {"$ne":''}})
269 recdict = {}
288 recdict = {}
270 for rec in recs:
289 for rec in recs:
271 recdict[rec['msg_id']] = rec
290 recdict[rec['msg_id']] = rec
272
291
273 latest = datetime(1984,1,1)
292 latest = datetime(1984,1,1)
274 for msg_id in hist:
293 for msg_id in hist:
275 rec = recdict[msg_id]
294 rec = recdict[msg_id]
276 newt = rec['submitted']
295 newt = rec['submitted']
277 self.assertTrue(newt >= latest)
296 self.assertTrue(newt >= latest)
278 latest = newt
297 latest = newt
279 ar = self.client[-1].apply_async(lambda : 1)
298 ar = self.client[-1].apply_async(lambda : 1)
280 ar.get()
299 ar.get()
281 time.sleep(0.25)
300 time.sleep(0.25)
282 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
301 self.assertEquals(self.client.hub_history()[-1:],ar.msg_ids)
283
302
284 def _wait_for_idle(self):
303 def _wait_for_idle(self):
285 """wait for an engine to become idle, according to the Hub"""
304 """wait for an engine to become idle, according to the Hub"""
286 rc = self.client
305 rc = self.client
287
306
288 # timeout 5s, polling every 100ms
307 # timeout 5s, polling every 100ms
289 qs = rc.queue_status()
308 qs = rc.queue_status()
290 for i in range(50):
309 for i in range(50):
291 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
310 if qs['unassigned'] or any(qs[eid]['tasks'] for eid in rc.ids):
292 time.sleep(0.1)
311 time.sleep(0.1)
293 qs = rc.queue_status()
312 qs = rc.queue_status()
294 else:
313 else:
295 break
314 break
296
315
297 # ensure Hub up to date:
316 # ensure Hub up to date:
298 self.assertEquals(qs['unassigned'], 0)
317 self.assertEquals(qs['unassigned'], 0)
299 for eid in rc.ids:
318 for eid in rc.ids:
300 self.assertEquals(qs[eid]['tasks'], 0)
319 self.assertEquals(qs[eid]['tasks'], 0)
301
320
302
321
303 def test_resubmit(self):
322 def test_resubmit(self):
304 def f():
323 def f():
305 import random
324 import random
306 return random.random()
325 return random.random()
307 v = self.client.load_balanced_view()
326 v = self.client.load_balanced_view()
308 ar = v.apply_async(f)
327 ar = v.apply_async(f)
309 r1 = ar.get(1)
328 r1 = ar.get(1)
310 # give the Hub a chance to notice:
329 # give the Hub a chance to notice:
311 self._wait_for_idle()
330 self._wait_for_idle()
312 ahr = self.client.resubmit(ar.msg_ids)
331 ahr = self.client.resubmit(ar.msg_ids)
313 r2 = ahr.get(1)
332 r2 = ahr.get(1)
314 self.assertFalse(r1 == r2)
333 self.assertFalse(r1 == r2)
315
334
316 def test_resubmit_chain(self):
335 def test_resubmit_chain(self):
317 """resubmit resubmitted tasks"""
336 """resubmit resubmitted tasks"""
318 v = self.client.load_balanced_view()
337 v = self.client.load_balanced_view()
319 ar = v.apply_async(lambda x: x, 'x'*1024)
338 ar = v.apply_async(lambda x: x, 'x'*1024)
320 ar.get()
339 ar.get()
321 self._wait_for_idle()
340 self._wait_for_idle()
322 ars = [ar]
341 ars = [ar]
323
342
324 for i in range(10):
343 for i in range(10):
325 ar = ars[-1]
344 ar = ars[-1]
326 ar2 = self.client.resubmit(ar.msg_ids)
345 ar2 = self.client.resubmit(ar.msg_ids)
327
346
328 [ ar.get() for ar in ars ]
347 [ ar.get() for ar in ars ]
329
348
330 def test_resubmit_header(self):
349 def test_resubmit_header(self):
331 """resubmit shouldn't clobber the whole header"""
350 """resubmit shouldn't clobber the whole header"""
332 def f():
351 def f():
333 import random
352 import random
334 return random.random()
353 return random.random()
335 v = self.client.load_balanced_view()
354 v = self.client.load_balanced_view()
336 v.retries = 1
355 v.retries = 1
337 ar = v.apply_async(f)
356 ar = v.apply_async(f)
338 r1 = ar.get(1)
357 r1 = ar.get(1)
339 # give the Hub a chance to notice:
358 # give the Hub a chance to notice:
340 self._wait_for_idle()
359 self._wait_for_idle()
341 ahr = self.client.resubmit(ar.msg_ids)
360 ahr = self.client.resubmit(ar.msg_ids)
342 ahr.get(1)
361 ahr.get(1)
343 time.sleep(0.5)
362 time.sleep(0.5)
344 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
363 records = self.client.db_query({'msg_id': {'$in': ar.msg_ids + ahr.msg_ids}}, keys='header')
345 h1,h2 = [ r['header'] for r in records ]
364 h1,h2 = [ r['header'] for r in records ]
346 for key in set(h1.keys()).union(set(h2.keys())):
365 for key in set(h1.keys()).union(set(h2.keys())):
347 if key in ('msg_id', 'date'):
366 if key in ('msg_id', 'date'):
348 self.assertNotEquals(h1[key], h2[key])
367 self.assertNotEquals(h1[key], h2[key])
349 else:
368 else:
350 self.assertEquals(h1[key], h2[key])
369 self.assertEquals(h1[key], h2[key])
351
370
352 def test_resubmit_aborted(self):
371 def test_resubmit_aborted(self):
353 def f():
372 def f():
354 import random
373 import random
355 return random.random()
374 return random.random()
356 v = self.client.load_balanced_view()
375 v = self.client.load_balanced_view()
357 # restrict to one engine, so we can put a sleep
376 # restrict to one engine, so we can put a sleep
358 # ahead of the task, so it will get aborted
377 # ahead of the task, so it will get aborted
359 eid = self.client.ids[-1]
378 eid = self.client.ids[-1]
360 v.targets = [eid]
379 v.targets = [eid]
361 sleep = v.apply_async(time.sleep, 0.5)
380 sleep = v.apply_async(time.sleep, 0.5)
362 ar = v.apply_async(f)
381 ar = v.apply_async(f)
363 ar.abort()
382 ar.abort()
364 self.assertRaises(error.TaskAborted, ar.get)
383 self.assertRaises(error.TaskAborted, ar.get)
365 # Give the Hub a chance to get up to date:
384 # Give the Hub a chance to get up to date:
366 self._wait_for_idle()
385 self._wait_for_idle()
367 ahr = self.client.resubmit(ar.msg_ids)
386 ahr = self.client.resubmit(ar.msg_ids)
368 r2 = ahr.get(1)
387 r2 = ahr.get(1)
369
388
370 def test_resubmit_inflight(self):
389 def test_resubmit_inflight(self):
371 """resubmit of inflight task"""
390 """resubmit of inflight task"""
372 v = self.client.load_balanced_view()
391 v = self.client.load_balanced_view()
373 ar = v.apply_async(time.sleep,1)
392 ar = v.apply_async(time.sleep,1)
374 # give the message a chance to arrive
393 # give the message a chance to arrive
375 time.sleep(0.2)
394 time.sleep(0.2)
376 ahr = self.client.resubmit(ar.msg_ids)
395 ahr = self.client.resubmit(ar.msg_ids)
377 ar.get(2)
396 ar.get(2)
378 ahr.get(2)
397 ahr.get(2)
379
398
380 def test_resubmit_badkey(self):
399 def test_resubmit_badkey(self):
381 """ensure KeyError on resubmit of nonexistant task"""
400 """ensure KeyError on resubmit of nonexistant task"""
382 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
401 self.assertRaisesRemote(KeyError, self.client.resubmit, ['invalid'])
383
402
384 def test_purge_results(self):
403 def test_purge_results(self):
385 # ensure there are some tasks
404 # ensure there are some tasks
386 for i in range(5):
405 for i in range(5):
387 self.client[:].apply_sync(lambda : 1)
406 self.client[:].apply_sync(lambda : 1)
388 # Wait for the Hub to realise the result is done:
407 # Wait for the Hub to realise the result is done:
389 # This prevents a race condition, where we
408 # This prevents a race condition, where we
390 # might purge a result the Hub still thinks is pending.
409 # might purge a result the Hub still thinks is pending.
391 time.sleep(0.1)
410 time.sleep(0.1)
392 rc2 = clientmod.Client(profile='iptest')
411 rc2 = clientmod.Client(profile='iptest')
393 hist = self.client.hub_history()
412 hist = self.client.hub_history()
394 ahr = rc2.get_result([hist[-1]])
413 ahr = rc2.get_result([hist[-1]])
395 ahr.wait(10)
414 ahr.wait(10)
396 self.client.purge_results(hist[-1])
415 self.client.purge_results(hist[-1])
397 newhist = self.client.hub_history()
416 newhist = self.client.hub_history()
398 self.assertEquals(len(newhist)+1,len(hist))
417 self.assertEquals(len(newhist)+1,len(hist))
399 rc2.spin()
418 rc2.spin()
400 rc2.close()
419 rc2.close()
401
420
402 def test_purge_all_results(self):
421 def test_purge_all_results(self):
403 self.client.purge_results('all')
422 self.client.purge_results('all')
404 hist = self.client.hub_history()
423 hist = self.client.hub_history()
405 self.assertEquals(len(hist), 0)
424 self.assertEquals(len(hist), 0)
406
425
407 def test_spin_thread(self):
426 def test_spin_thread(self):
408 self.client.spin_thread(0.01)
427 self.client.spin_thread(0.01)
409 ar = self.client[-1].apply_async(lambda : 1)
428 ar = self.client[-1].apply_async(lambda : 1)
410 time.sleep(0.1)
429 time.sleep(0.1)
411 self.assertTrue(ar.wall_time < 0.1,
430 self.assertTrue(ar.wall_time < 0.1,
412 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
431 "spin should have kept wall_time < 0.1, but got %f" % ar.wall_time
413 )
432 )
414
433
415 def test_stop_spin_thread(self):
434 def test_stop_spin_thread(self):
416 self.client.spin_thread(0.01)
435 self.client.spin_thread(0.01)
417 self.client.stop_spin_thread()
436 self.client.stop_spin_thread()
418 ar = self.client[-1].apply_async(lambda : 1)
437 ar = self.client[-1].apply_async(lambda : 1)
419 time.sleep(0.15)
438 time.sleep(0.15)
420 self.assertTrue(ar.wall_time > 0.1,
439 self.assertTrue(ar.wall_time > 0.1,
421 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
440 "Shouldn't be spinning, but got wall_time=%f" % ar.wall_time
422 )
441 )
423
442
424 def test_activate(self):
443 def test_activate(self):
425 ip = get_ipython()
444 ip = get_ipython()
426 magics = ip.magics_manager.magics
445 magics = ip.magics_manager.magics
427 self.assertTrue('px' in magics['line'])
446 self.assertTrue('px' in magics['line'])
428 self.assertTrue('px' in magics['cell'])
447 self.assertTrue('px' in magics['cell'])
429 v0 = self.client.activate(-1, '0')
448 v0 = self.client.activate(-1, '0')
430 self.assertTrue('px0' in magics['line'])
449 self.assertTrue('px0' in magics['line'])
431 self.assertTrue('px0' in magics['cell'])
450 self.assertTrue('px0' in magics['cell'])
432 self.assertEquals(v0.targets, self.client.ids[-1])
451 self.assertEquals(v0.targets, self.client.ids[-1])
433 v0 = self.client.activate('all', 'all')
452 v0 = self.client.activate('all', 'all')
434 self.assertTrue('pxall' in magics['line'])
453 self.assertTrue('pxall' in magics['line'])
435 self.assertTrue('pxall' in magics['cell'])
454 self.assertTrue('pxall' in magics['cell'])
436 self.assertEquals(v0.targets, 'all')
455 self.assertEquals(v0.targets, 'all')
General Comments 0
You need to be logged in to leave comments. Login now