##// END OF EJS Templates
Don't set explicit IDENTITY for clients...
MinRK -
Show More
@@ -1,1721 +1,1713 b''
1 """A semi-synchronous Client for the ZMQ cluster
1 """A semi-synchronous Client for the ZMQ cluster
2
2
3 Authors:
3 Authors:
4
4
5 * MinRK
5 * MinRK
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import os
18 import os
19 import json
19 import json
20 import sys
20 import sys
21 from threading import Thread, Event
21 from threading import Thread, Event
22 import time
22 import time
23 import warnings
23 import warnings
24 from datetime import datetime
24 from datetime import datetime
25 from getpass import getpass
25 from getpass import getpass
26 from pprint import pprint
26 from pprint import pprint
27
27
28 pjoin = os.path.join
28 pjoin = os.path.join
29
29
30 import zmq
30 import zmq
31 # from zmq.eventloop import ioloop, zmqstream
31 # from zmq.eventloop import ioloop, zmqstream
32
32
33 from IPython.config.configurable import MultipleInstanceError
33 from IPython.config.configurable import MultipleInstanceError
34 from IPython.core.application import BaseIPythonApplication
34 from IPython.core.application import BaseIPythonApplication
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
35 from IPython.core.profiledir import ProfileDir, ProfileDirError
36
36
37 from IPython.utils.coloransi import TermColors
37 from IPython.utils.coloransi import TermColors
38 from IPython.utils.jsonutil import rekey
38 from IPython.utils.jsonutil import rekey
39 from IPython.utils.localinterfaces import LOCAL_IPS
39 from IPython.utils.localinterfaces import LOCAL_IPS
40 from IPython.utils.path import get_ipython_dir
40 from IPython.utils.path import get_ipython_dir
41 from IPython.utils.py3compat import cast_bytes
41 from IPython.utils.py3compat import cast_bytes
42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
42 from IPython.utils.traitlets import (HasTraits, Integer, Instance, Unicode,
43 Dict, List, Bool, Set, Any)
43 Dict, List, Bool, Set, Any)
44 from IPython.external.decorator import decorator
44 from IPython.external.decorator import decorator
45 from IPython.external.ssh import tunnel
45 from IPython.external.ssh import tunnel
46
46
47 from IPython.parallel import Reference
47 from IPython.parallel import Reference
48 from IPython.parallel import error
48 from IPython.parallel import error
49 from IPython.parallel import util
49 from IPython.parallel import util
50
50
51 from IPython.zmq.session import Session, Message
51 from IPython.zmq.session import Session, Message
52
52
53 from .asyncresult import AsyncResult, AsyncHubResult
53 from .asyncresult import AsyncResult, AsyncHubResult
54 from .view import DirectView, LoadBalancedView
54 from .view import DirectView, LoadBalancedView
55
55
56 if sys.version_info[0] >= 3:
56 if sys.version_info[0] >= 3:
57 # xrange is used in a couple 'isinstance' tests in py2
57 # xrange is used in a couple 'isinstance' tests in py2
58 # should be just 'range' in 3k
58 # should be just 'range' in 3k
59 xrange = range
59 xrange = range
60
60
61 #--------------------------------------------------------------------------
61 #--------------------------------------------------------------------------
62 # Decorators for Client methods
62 # Decorators for Client methods
63 #--------------------------------------------------------------------------
63 #--------------------------------------------------------------------------
64
64
65 @decorator
65 @decorator
66 def spin_first(f, self, *args, **kwargs):
66 def spin_first(f, self, *args, **kwargs):
67 """Call spin() to sync state prior to calling the method."""
67 """Call spin() to sync state prior to calling the method."""
68 self.spin()
68 self.spin()
69 return f(self, *args, **kwargs)
69 return f(self, *args, **kwargs)
70
70
71
71
72 #--------------------------------------------------------------------------
72 #--------------------------------------------------------------------------
73 # Classes
73 # Classes
74 #--------------------------------------------------------------------------
74 #--------------------------------------------------------------------------
75
75
76
76
77 class ExecuteReply(object):
77 class ExecuteReply(object):
78 """wrapper for finished Execute results"""
78 """wrapper for finished Execute results"""
79 def __init__(self, msg_id, content, metadata):
79 def __init__(self, msg_id, content, metadata):
80 self.msg_id = msg_id
80 self.msg_id = msg_id
81 self._content = content
81 self._content = content
82 self.execution_count = content['execution_count']
82 self.execution_count = content['execution_count']
83 self.metadata = metadata
83 self.metadata = metadata
84
84
85 def __getitem__(self, key):
85 def __getitem__(self, key):
86 return self.metadata[key]
86 return self.metadata[key]
87
87
88 def __getattr__(self, key):
88 def __getattr__(self, key):
89 if key not in self.metadata:
89 if key not in self.metadata:
90 raise AttributeError(key)
90 raise AttributeError(key)
91 return self.metadata[key]
91 return self.metadata[key]
92
92
93 def __repr__(self):
93 def __repr__(self):
94 pyout = self.metadata['pyout'] or {'data':{}}
94 pyout = self.metadata['pyout'] or {'data':{}}
95 text_out = pyout['data'].get('text/plain', '')
95 text_out = pyout['data'].get('text/plain', '')
96 if len(text_out) > 32:
96 if len(text_out) > 32:
97 text_out = text_out[:29] + '...'
97 text_out = text_out[:29] + '...'
98
98
99 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
99 return "<ExecuteReply[%i]: %s>" % (self.execution_count, text_out)
100
100
101 def _repr_pretty_(self, p, cycle):
101 def _repr_pretty_(self, p, cycle):
102 pyout = self.metadata['pyout'] or {'data':{}}
102 pyout = self.metadata['pyout'] or {'data':{}}
103 text_out = pyout['data'].get('text/plain', '')
103 text_out = pyout['data'].get('text/plain', '')
104
104
105 if not text_out:
105 if not text_out:
106 return
106 return
107
107
108 try:
108 try:
109 ip = get_ipython()
109 ip = get_ipython()
110 except NameError:
110 except NameError:
111 colors = "NoColor"
111 colors = "NoColor"
112 else:
112 else:
113 colors = ip.colors
113 colors = ip.colors
114
114
115 if colors == "NoColor":
115 if colors == "NoColor":
116 out = normal = ""
116 out = normal = ""
117 else:
117 else:
118 out = TermColors.Red
118 out = TermColors.Red
119 normal = TermColors.Normal
119 normal = TermColors.Normal
120
120
121 if '\n' in text_out and not text_out.startswith('\n'):
121 if '\n' in text_out and not text_out.startswith('\n'):
122 # add newline for multiline reprs
122 # add newline for multiline reprs
123 text_out = '\n' + text_out
123 text_out = '\n' + text_out
124
124
125 p.text(
125 p.text(
126 out + u'Out[%i:%i]: ' % (
126 out + u'Out[%i:%i]: ' % (
127 self.metadata['engine_id'], self.execution_count
127 self.metadata['engine_id'], self.execution_count
128 ) + normal + text_out
128 ) + normal + text_out
129 )
129 )
130
130
131 def _repr_html_(self):
131 def _repr_html_(self):
132 pyout = self.metadata['pyout'] or {'data':{}}
132 pyout = self.metadata['pyout'] or {'data':{}}
133 return pyout['data'].get("text/html")
133 return pyout['data'].get("text/html")
134
134
135 def _repr_latex_(self):
135 def _repr_latex_(self):
136 pyout = self.metadata['pyout'] or {'data':{}}
136 pyout = self.metadata['pyout'] or {'data':{}}
137 return pyout['data'].get("text/latex")
137 return pyout['data'].get("text/latex")
138
138
139 def _repr_json_(self):
139 def _repr_json_(self):
140 pyout = self.metadata['pyout'] or {'data':{}}
140 pyout = self.metadata['pyout'] or {'data':{}}
141 return pyout['data'].get("application/json")
141 return pyout['data'].get("application/json")
142
142
143 def _repr_javascript_(self):
143 def _repr_javascript_(self):
144 pyout = self.metadata['pyout'] or {'data':{}}
144 pyout = self.metadata['pyout'] or {'data':{}}
145 return pyout['data'].get("application/javascript")
145 return pyout['data'].get("application/javascript")
146
146
147 def _repr_png_(self):
147 def _repr_png_(self):
148 pyout = self.metadata['pyout'] or {'data':{}}
148 pyout = self.metadata['pyout'] or {'data':{}}
149 return pyout['data'].get("image/png")
149 return pyout['data'].get("image/png")
150
150
151 def _repr_jpeg_(self):
151 def _repr_jpeg_(self):
152 pyout = self.metadata['pyout'] or {'data':{}}
152 pyout = self.metadata['pyout'] or {'data':{}}
153 return pyout['data'].get("image/jpeg")
153 return pyout['data'].get("image/jpeg")
154
154
155 def _repr_svg_(self):
155 def _repr_svg_(self):
156 pyout = self.metadata['pyout'] or {'data':{}}
156 pyout = self.metadata['pyout'] or {'data':{}}
157 return pyout['data'].get("image/svg+xml")
157 return pyout['data'].get("image/svg+xml")
158
158
159
159
160 class Metadata(dict):
160 class Metadata(dict):
161 """Subclass of dict for initializing metadata values.
161 """Subclass of dict for initializing metadata values.
162
162
163 Attribute access works on keys.
163 Attribute access works on keys.
164
164
165 These objects have a strict set of keys - errors will raise if you try
165 These objects have a strict set of keys - errors will raise if you try
166 to add new keys.
166 to add new keys.
167 """
167 """
168 def __init__(self, *args, **kwargs):
168 def __init__(self, *args, **kwargs):
169 dict.__init__(self)
169 dict.__init__(self)
170 md = {'msg_id' : None,
170 md = {'msg_id' : None,
171 'submitted' : None,
171 'submitted' : None,
172 'started' : None,
172 'started' : None,
173 'completed' : None,
173 'completed' : None,
174 'received' : None,
174 'received' : None,
175 'engine_uuid' : None,
175 'engine_uuid' : None,
176 'engine_id' : None,
176 'engine_id' : None,
177 'follow' : None,
177 'follow' : None,
178 'after' : None,
178 'after' : None,
179 'status' : None,
179 'status' : None,
180
180
181 'pyin' : None,
181 'pyin' : None,
182 'pyout' : None,
182 'pyout' : None,
183 'pyerr' : None,
183 'pyerr' : None,
184 'stdout' : '',
184 'stdout' : '',
185 'stderr' : '',
185 'stderr' : '',
186 'outputs' : [],
186 'outputs' : [],
187 'outputs_ready' : False,
187 'outputs_ready' : False,
188 }
188 }
189 self.update(md)
189 self.update(md)
190 self.update(dict(*args, **kwargs))
190 self.update(dict(*args, **kwargs))
191
191
192 def __getattr__(self, key):
192 def __getattr__(self, key):
193 """getattr aliased to getitem"""
193 """getattr aliased to getitem"""
194 if key in self.iterkeys():
194 if key in self.iterkeys():
195 return self[key]
195 return self[key]
196 else:
196 else:
197 raise AttributeError(key)
197 raise AttributeError(key)
198
198
199 def __setattr__(self, key, value):
199 def __setattr__(self, key, value):
200 """setattr aliased to setitem, with strict"""
200 """setattr aliased to setitem, with strict"""
201 if key in self.iterkeys():
201 if key in self.iterkeys():
202 self[key] = value
202 self[key] = value
203 else:
203 else:
204 raise AttributeError(key)
204 raise AttributeError(key)
205
205
206 def __setitem__(self, key, value):
206 def __setitem__(self, key, value):
207 """strict static key enforcement"""
207 """strict static key enforcement"""
208 if key in self.iterkeys():
208 if key in self.iterkeys():
209 dict.__setitem__(self, key, value)
209 dict.__setitem__(self, key, value)
210 else:
210 else:
211 raise KeyError(key)
211 raise KeyError(key)
212
212
213
213
214 class Client(HasTraits):
214 class Client(HasTraits):
215 """A semi-synchronous client to the IPython ZMQ cluster
215 """A semi-synchronous client to the IPython ZMQ cluster
216
216
217 Parameters
217 Parameters
218 ----------
218 ----------
219
219
220 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
220 url_or_file : bytes or unicode; zmq url or path to ipcontroller-client.json
221 Connection information for the Hub's registration. If a json connector
221 Connection information for the Hub's registration. If a json connector
222 file is given, then likely no further configuration is necessary.
222 file is given, then likely no further configuration is necessary.
223 [Default: use profile]
223 [Default: use profile]
224 profile : bytes
224 profile : bytes
225 The name of the Cluster profile to be used to find connector information.
225 The name of the Cluster profile to be used to find connector information.
226 If run from an IPython application, the default profile will be the same
226 If run from an IPython application, the default profile will be the same
227 as the running application, otherwise it will be 'default'.
227 as the running application, otherwise it will be 'default'.
228 context : zmq.Context
228 context : zmq.Context
229 Pass an existing zmq.Context instance, otherwise the client will create its own.
229 Pass an existing zmq.Context instance, otherwise the client will create its own.
230 debug : bool
230 debug : bool
231 flag for lots of message printing for debug purposes
231 flag for lots of message printing for debug purposes
232 timeout : int/float
232 timeout : int/float
233 time (in seconds) to wait for connection replies from the Hub
233 time (in seconds) to wait for connection replies from the Hub
234 [Default: 10]
234 [Default: 10]
235
235
236 #-------------- session related args ----------------
236 #-------------- session related args ----------------
237
237
238 config : Config object
238 config : Config object
239 If specified, this will be relayed to the Session for configuration
239 If specified, this will be relayed to the Session for configuration
240 username : str
240 username : str
241 set username for the session object
241 set username for the session object
242 packer : str (import_string) or callable
242 packer : str (import_string) or callable
243 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
243 Can be either the simple keyword 'json' or 'pickle', or an import_string to a
244 function to serialize messages. Must support same input as
244 function to serialize messages. Must support same input as
245 JSON, and output must be bytes.
245 JSON, and output must be bytes.
246 You can pass a callable directly as `pack`
246 You can pass a callable directly as `pack`
247 unpacker : str (import_string) or callable
247 unpacker : str (import_string) or callable
248 The inverse of packer. Only necessary if packer is specified as *not* one
248 The inverse of packer. Only necessary if packer is specified as *not* one
249 of 'json' or 'pickle'.
249 of 'json' or 'pickle'.
250
250
251 #-------------- ssh related args ----------------
251 #-------------- ssh related args ----------------
252 # These are args for configuring the ssh tunnel to be used
252 # These are args for configuring the ssh tunnel to be used
253 # credentials are used to forward connections over ssh to the Controller
253 # credentials are used to forward connections over ssh to the Controller
254 # Note that the ip given in `addr` needs to be relative to sshserver
254 # Note that the ip given in `addr` needs to be relative to sshserver
255 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
255 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
256 # and set sshserver as the same machine the Controller is on. However,
256 # and set sshserver as the same machine the Controller is on. However,
257 # the only requirement is that sshserver is able to see the Controller
257 # the only requirement is that sshserver is able to see the Controller
258 # (i.e. is within the same trusted network).
258 # (i.e. is within the same trusted network).
259
259
260 sshserver : str
260 sshserver : str
261 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
261 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
262 If keyfile or password is specified, and this is not, it will default to
262 If keyfile or password is specified, and this is not, it will default to
263 the ip given in addr.
263 the ip given in addr.
264 sshkey : str; path to ssh private key file
264 sshkey : str; path to ssh private key file
265 This specifies a key to be used in ssh login, default None.
265 This specifies a key to be used in ssh login, default None.
266 Regular default ssh keys will be used without specifying this argument.
266 Regular default ssh keys will be used without specifying this argument.
267 password : str
267 password : str
268 Your ssh password to sshserver. Note that if this is left None,
268 Your ssh password to sshserver. Note that if this is left None,
269 you will be prompted for it if passwordless key based login is unavailable.
269 you will be prompted for it if passwordless key based login is unavailable.
270 paramiko : bool
270 paramiko : bool
271 flag for whether to use paramiko instead of shell ssh for tunneling.
271 flag for whether to use paramiko instead of shell ssh for tunneling.
272 [default: True on win32, False else]
272 [default: True on win32, False else]
273
273
274 ------- exec authentication args -------
274 ------- exec authentication args -------
275 If even localhost is untrusted, you can have some protection against
275 If even localhost is untrusted, you can have some protection against
276 unauthorized execution by signing messages with HMAC digests.
276 unauthorized execution by signing messages with HMAC digests.
277 Messages are still sent as cleartext, so if someone can snoop your
277 Messages are still sent as cleartext, so if someone can snoop your
278 loopback traffic this will not protect your privacy, but will prevent
278 loopback traffic this will not protect your privacy, but will prevent
279 unauthorized execution.
279 unauthorized execution.
280
280
281 exec_key : str
281 exec_key : str
282 an authentication key or file containing a key
282 an authentication key or file containing a key
283 default: None
283 default: None
284
284
285
285
286 Attributes
286 Attributes
287 ----------
287 ----------
288
288
289 ids : list of int engine IDs
289 ids : list of int engine IDs
290 requesting the ids attribute always synchronizes
290 requesting the ids attribute always synchronizes
291 the registration state. To request ids without synchronization,
291 the registration state. To request ids without synchronization,
292 use semi-private _ids attributes.
292 use semi-private _ids attributes.
293
293
294 history : list of msg_ids
294 history : list of msg_ids
295 a list of msg_ids, keeping track of all the execution
295 a list of msg_ids, keeping track of all the execution
296 messages you have submitted in order.
296 messages you have submitted in order.
297
297
298 outstanding : set of msg_ids
298 outstanding : set of msg_ids
299 a set of msg_ids that have been submitted, but whose
299 a set of msg_ids that have been submitted, but whose
300 results have not yet been received.
300 results have not yet been received.
301
301
302 results : dict
302 results : dict
303 a dict of all our results, keyed by msg_id
303 a dict of all our results, keyed by msg_id
304
304
305 block : bool
305 block : bool
306 determines default behavior when block not specified
306 determines default behavior when block not specified
307 in execution methods
307 in execution methods
308
308
309 Methods
309 Methods
310 -------
310 -------
311
311
312 spin
312 spin
313 flushes incoming results and registration state changes
313 flushes incoming results and registration state changes
314 control methods spin, and requesting `ids` also ensures up to date
314 control methods spin, and requesting `ids` also ensures up to date
315
315
316 wait
316 wait
317 wait on one or more msg_ids
317 wait on one or more msg_ids
318
318
319 execution methods
319 execution methods
320 apply
320 apply
321 legacy: execute, run
321 legacy: execute, run
322
322
323 data movement
323 data movement
324 push, pull, scatter, gather
324 push, pull, scatter, gather
325
325
326 query methods
326 query methods
327 queue_status, get_result, purge, result_status
327 queue_status, get_result, purge, result_status
328
328
329 control methods
329 control methods
330 abort, shutdown
330 abort, shutdown
331
331
332 """
332 """
333
333
334
334
335 block = Bool(False)
335 block = Bool(False)
336 outstanding = Set()
336 outstanding = Set()
337 results = Instance('collections.defaultdict', (dict,))
337 results = Instance('collections.defaultdict', (dict,))
338 metadata = Instance('collections.defaultdict', (Metadata,))
338 metadata = Instance('collections.defaultdict', (Metadata,))
339 history = List()
339 history = List()
340 debug = Bool(False)
340 debug = Bool(False)
341 _spin_thread = Any()
341 _spin_thread = Any()
342 _stop_spinning = Any()
342 _stop_spinning = Any()
343
343
344 profile=Unicode()
344 profile=Unicode()
345 def _profile_default(self):
345 def _profile_default(self):
346 if BaseIPythonApplication.initialized():
346 if BaseIPythonApplication.initialized():
347 # an IPython app *might* be running, try to get its profile
347 # an IPython app *might* be running, try to get its profile
348 try:
348 try:
349 return BaseIPythonApplication.instance().profile
349 return BaseIPythonApplication.instance().profile
350 except (AttributeError, MultipleInstanceError):
350 except (AttributeError, MultipleInstanceError):
351 # could be a *different* subclass of config.Application,
351 # could be a *different* subclass of config.Application,
352 # which would raise one of these two errors.
352 # which would raise one of these two errors.
353 return u'default'
353 return u'default'
354 else:
354 else:
355 return u'default'
355 return u'default'
356
356
357
357
358 _outstanding_dict = Instance('collections.defaultdict', (set,))
358 _outstanding_dict = Instance('collections.defaultdict', (set,))
359 _ids = List()
359 _ids = List()
360 _connected=Bool(False)
360 _connected=Bool(False)
361 _ssh=Bool(False)
361 _ssh=Bool(False)
362 _context = Instance('zmq.Context')
362 _context = Instance('zmq.Context')
363 _config = Dict()
363 _config = Dict()
364 _engines=Instance(util.ReverseDict, (), {})
364 _engines=Instance(util.ReverseDict, (), {})
365 # _hub_socket=Instance('zmq.Socket')
365 # _hub_socket=Instance('zmq.Socket')
366 _query_socket=Instance('zmq.Socket')
366 _query_socket=Instance('zmq.Socket')
367 _control_socket=Instance('zmq.Socket')
367 _control_socket=Instance('zmq.Socket')
368 _iopub_socket=Instance('zmq.Socket')
368 _iopub_socket=Instance('zmq.Socket')
369 _notification_socket=Instance('zmq.Socket')
369 _notification_socket=Instance('zmq.Socket')
370 _mux_socket=Instance('zmq.Socket')
370 _mux_socket=Instance('zmq.Socket')
371 _task_socket=Instance('zmq.Socket')
371 _task_socket=Instance('zmq.Socket')
372 _task_scheme=Unicode()
372 _task_scheme=Unicode()
373 _closed = False
373 _closed = False
374 _ignored_control_replies=Integer(0)
374 _ignored_control_replies=Integer(0)
375 _ignored_hub_replies=Integer(0)
375 _ignored_hub_replies=Integer(0)
376
376
377 def __new__(self, *args, **kw):
377 def __new__(self, *args, **kw):
378 # don't raise on positional args
378 # don't raise on positional args
379 return HasTraits.__new__(self, **kw)
379 return HasTraits.__new__(self, **kw)
380
380
381 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
381 def __init__(self, url_or_file=None, profile=None, profile_dir=None, ipython_dir=None,
382 context=None, debug=False, exec_key=None,
382 context=None, debug=False, exec_key=None,
383 sshserver=None, sshkey=None, password=None, paramiko=None,
383 sshserver=None, sshkey=None, password=None, paramiko=None,
384 timeout=10, **extra_args
384 timeout=10, **extra_args
385 ):
385 ):
386 if profile:
386 if profile:
387 super(Client, self).__init__(debug=debug, profile=profile)
387 super(Client, self).__init__(debug=debug, profile=profile)
388 else:
388 else:
389 super(Client, self).__init__(debug=debug)
389 super(Client, self).__init__(debug=debug)
390 if context is None:
390 if context is None:
391 context = zmq.Context.instance()
391 context = zmq.Context.instance()
392 self._context = context
392 self._context = context
393 self._stop_spinning = Event()
393 self._stop_spinning = Event()
394
394
395 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
395 self._setup_profile_dir(self.profile, profile_dir, ipython_dir)
396 if self._cd is not None:
396 if self._cd is not None:
397 if url_or_file is None:
397 if url_or_file is None:
398 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
398 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
399 if url_or_file is None:
399 if url_or_file is None:
400 raise ValueError(
400 raise ValueError(
401 "I can't find enough information to connect to a hub!"
401 "I can't find enough information to connect to a hub!"
402 " Please specify at least one of url_or_file or profile."
402 " Please specify at least one of url_or_file or profile."
403 )
403 )
404
404
405 if not util.is_url(url_or_file):
405 if not util.is_url(url_or_file):
406 # it's not a url, try for a file
406 # it's not a url, try for a file
407 if not os.path.exists(url_or_file):
407 if not os.path.exists(url_or_file):
408 if self._cd:
408 if self._cd:
409 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
409 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
410 if not os.path.exists(url_or_file):
410 if not os.path.exists(url_or_file):
411 raise IOError("Connection file not found: %r" % url_or_file)
411 raise IOError("Connection file not found: %r" % url_or_file)
412 with open(url_or_file) as f:
412 with open(url_or_file) as f:
413 cfg = json.loads(f.read())
413 cfg = json.loads(f.read())
414 else:
414 else:
415 cfg = {'url':url_or_file}
415 cfg = {'url':url_or_file}
416
416
417 # sync defaults from args, json:
417 # sync defaults from args, json:
418 if sshserver:
418 if sshserver:
419 cfg['ssh'] = sshserver
419 cfg['ssh'] = sshserver
420 if exec_key:
420 if exec_key:
421 cfg['exec_key'] = exec_key
421 cfg['exec_key'] = exec_key
422 exec_key = cfg['exec_key']
422 exec_key = cfg['exec_key']
423 location = cfg.setdefault('location', None)
423 location = cfg.setdefault('location', None)
424 cfg['url'] = util.disambiguate_url(cfg['url'], location)
424 cfg['url'] = util.disambiguate_url(cfg['url'], location)
425 url = cfg['url']
425 url = cfg['url']
426 proto,addr,port = util.split_url(url)
426 proto,addr,port = util.split_url(url)
427 if location is not None and addr == '127.0.0.1':
427 if location is not None and addr == '127.0.0.1':
428 # location specified, and connection is expected to be local
428 # location specified, and connection is expected to be local
429 if location not in LOCAL_IPS and not sshserver:
429 if location not in LOCAL_IPS and not sshserver:
430 # load ssh from JSON *only* if the controller is not on
430 # load ssh from JSON *only* if the controller is not on
431 # this machine
431 # this machine
432 sshserver=cfg['ssh']
432 sshserver=cfg['ssh']
433 if location not in LOCAL_IPS and not sshserver:
433 if location not in LOCAL_IPS and not sshserver:
434 # warn if no ssh specified, but SSH is probably needed
434 # warn if no ssh specified, but SSH is probably needed
435 # This is only a warning, because the most likely cause
435 # This is only a warning, because the most likely cause
436 # is a local Controller on a laptop whose IP is dynamic
436 # is a local Controller on a laptop whose IP is dynamic
437 warnings.warn("""
437 warnings.warn("""
438 Controller appears to be listening on localhost, but not on this machine.
438 Controller appears to be listening on localhost, but not on this machine.
439 If this is true, you should specify Client(...,sshserver='you@%s')
439 If this is true, you should specify Client(...,sshserver='you@%s')
440 or instruct your controller to listen on an external IP."""%location,
440 or instruct your controller to listen on an external IP."""%location,
441 RuntimeWarning)
441 RuntimeWarning)
442 elif not sshserver:
442 elif not sshserver:
443 # otherwise sync with cfg
443 # otherwise sync with cfg
444 sshserver = cfg['ssh']
444 sshserver = cfg['ssh']
445
445
446 self._config = cfg
446 self._config = cfg
447
447
448 self._ssh = bool(sshserver or sshkey or password)
448 self._ssh = bool(sshserver or sshkey or password)
449 if self._ssh and sshserver is None:
449 if self._ssh and sshserver is None:
450 # default to ssh via localhost
450 # default to ssh via localhost
451 sshserver = url.split('://')[1].split(':')[0]
451 sshserver = url.split('://')[1].split(':')[0]
452 if self._ssh and password is None:
452 if self._ssh and password is None:
453 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
453 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
454 password=False
454 password=False
455 else:
455 else:
456 password = getpass("SSH Password for %s: "%sshserver)
456 password = getpass("SSH Password for %s: "%sshserver)
457 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
457 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
458
458
459 # configure and construct the session
459 # configure and construct the session
460 if exec_key is not None:
460 if exec_key is not None:
461 if os.path.isfile(exec_key):
461 if os.path.isfile(exec_key):
462 extra_args['keyfile'] = exec_key
462 extra_args['keyfile'] = exec_key
463 else:
463 else:
464 exec_key = cast_bytes(exec_key)
464 exec_key = cast_bytes(exec_key)
465 extra_args['key'] = exec_key
465 extra_args['key'] = exec_key
466 self.session = Session(**extra_args)
466 self.session = Session(**extra_args)
467
467
468 self._query_socket = self._context.socket(zmq.DEALER)
468 self._query_socket = self._context.socket(zmq.DEALER)
469 self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
469
470 if self._ssh:
470 if self._ssh:
471 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
471 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
472 else:
472 else:
473 self._query_socket.connect(url)
473 self._query_socket.connect(url)
474
474
475 self.session.debug = self.debug
475 self.session.debug = self.debug
476
476
477 self._notification_handlers = {'registration_notification' : self._register_engine,
477 self._notification_handlers = {'registration_notification' : self._register_engine,
478 'unregistration_notification' : self._unregister_engine,
478 'unregistration_notification' : self._unregister_engine,
479 'shutdown_notification' : lambda msg: self.close(),
479 'shutdown_notification' : lambda msg: self.close(),
480 }
480 }
481 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
481 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
482 'apply_reply' : self._handle_apply_reply}
482 'apply_reply' : self._handle_apply_reply}
483 self._connect(sshserver, ssh_kwargs, timeout)
483 self._connect(sshserver, ssh_kwargs, timeout)
484
484
485 # last step: setup magics, if we are in IPython:
485 # last step: setup magics, if we are in IPython:
486
486
487 try:
487 try:
488 ip = get_ipython()
488 ip = get_ipython()
489 except NameError:
489 except NameError:
490 return
490 return
491 else:
491 else:
492 if 'px' not in ip.magics_manager.magics:
492 if 'px' not in ip.magics_manager.magics:
493 # in IPython but we are the first Client.
493 # in IPython but we are the first Client.
494 # activate a default view for parallel magics.
494 # activate a default view for parallel magics.
495 self.activate()
495 self.activate()
496
496
497 def __del__(self):
497 def __del__(self):
498 """cleanup sockets, but _not_ context."""
498 """cleanup sockets, but _not_ context."""
499 self.close()
499 self.close()
500
500
501 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
501 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
502 if ipython_dir is None:
502 if ipython_dir is None:
503 ipython_dir = get_ipython_dir()
503 ipython_dir = get_ipython_dir()
504 if profile_dir is not None:
504 if profile_dir is not None:
505 try:
505 try:
506 self._cd = ProfileDir.find_profile_dir(profile_dir)
506 self._cd = ProfileDir.find_profile_dir(profile_dir)
507 return
507 return
508 except ProfileDirError:
508 except ProfileDirError:
509 pass
509 pass
510 elif profile is not None:
510 elif profile is not None:
511 try:
511 try:
512 self._cd = ProfileDir.find_profile_dir_by_name(
512 self._cd = ProfileDir.find_profile_dir_by_name(
513 ipython_dir, profile)
513 ipython_dir, profile)
514 return
514 return
515 except ProfileDirError:
515 except ProfileDirError:
516 pass
516 pass
517 self._cd = None
517 self._cd = None
518
518
519 def _update_engines(self, engines):
519 def _update_engines(self, engines):
520 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
520 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
521 for k,v in engines.iteritems():
521 for k,v in engines.iteritems():
522 eid = int(k)
522 eid = int(k)
523 self._engines[eid] = v
523 self._engines[eid] = v
524 self._ids.append(eid)
524 self._ids.append(eid)
525 self._ids = sorted(self._ids)
525 self._ids = sorted(self._ids)
526 if sorted(self._engines.keys()) != range(len(self._engines)) and \
526 if sorted(self._engines.keys()) != range(len(self._engines)) and \
527 self._task_scheme == 'pure' and self._task_socket:
527 self._task_scheme == 'pure' and self._task_socket:
528 self._stop_scheduling_tasks()
528 self._stop_scheduling_tasks()
529
529
530 def _stop_scheduling_tasks(self):
530 def _stop_scheduling_tasks(self):
531 """Stop scheduling tasks because an engine has been unregistered
531 """Stop scheduling tasks because an engine has been unregistered
532 from a pure ZMQ scheduler.
532 from a pure ZMQ scheduler.
533 """
533 """
534 self._task_socket.close()
534 self._task_socket.close()
535 self._task_socket = None
535 self._task_socket = None
536 msg = "An engine has been unregistered, and we are using pure " +\
536 msg = "An engine has been unregistered, and we are using pure " +\
537 "ZMQ task scheduling. Task farming will be disabled."
537 "ZMQ task scheduling. Task farming will be disabled."
538 if self.outstanding:
538 if self.outstanding:
539 msg += " If you were running tasks when this happened, " +\
539 msg += " If you were running tasks when this happened, " +\
540 "some `outstanding` msg_ids may never resolve."
540 "some `outstanding` msg_ids may never resolve."
541 warnings.warn(msg, RuntimeWarning)
541 warnings.warn(msg, RuntimeWarning)
542
542
543 def _build_targets(self, targets):
543 def _build_targets(self, targets):
544 """Turn valid target IDs or 'all' into two lists:
544 """Turn valid target IDs or 'all' into two lists:
545 (int_ids, uuids).
545 (int_ids, uuids).
546 """
546 """
547 if not self._ids:
547 if not self._ids:
548 # flush notification socket if no engines yet, just in case
548 # flush notification socket if no engines yet, just in case
549 if not self.ids:
549 if not self.ids:
550 raise error.NoEnginesRegistered("Can't build targets without any engines")
550 raise error.NoEnginesRegistered("Can't build targets without any engines")
551
551
552 if targets is None:
552 if targets is None:
553 targets = self._ids
553 targets = self._ids
554 elif isinstance(targets, basestring):
554 elif isinstance(targets, basestring):
555 if targets.lower() == 'all':
555 if targets.lower() == 'all':
556 targets = self._ids
556 targets = self._ids
557 else:
557 else:
558 raise TypeError("%r not valid str target, must be 'all'"%(targets))
558 raise TypeError("%r not valid str target, must be 'all'"%(targets))
559 elif isinstance(targets, int):
559 elif isinstance(targets, int):
560 if targets < 0:
560 if targets < 0:
561 targets = self.ids[targets]
561 targets = self.ids[targets]
562 if targets not in self._ids:
562 if targets not in self._ids:
563 raise IndexError("No such engine: %i"%targets)
563 raise IndexError("No such engine: %i"%targets)
564 targets = [targets]
564 targets = [targets]
565
565
566 if isinstance(targets, slice):
566 if isinstance(targets, slice):
567 indices = range(len(self._ids))[targets]
567 indices = range(len(self._ids))[targets]
568 ids = self.ids
568 ids = self.ids
569 targets = [ ids[i] for i in indices ]
569 targets = [ ids[i] for i in indices ]
570
570
571 if not isinstance(targets, (tuple, list, xrange)):
571 if not isinstance(targets, (tuple, list, xrange)):
572 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
572 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
573
573
574 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
574 return [cast_bytes(self._engines[t]) for t in targets], list(targets)
575
575
576 def _connect(self, sshserver, ssh_kwargs, timeout):
576 def _connect(self, sshserver, ssh_kwargs, timeout):
577 """setup all our socket connections to the cluster. This is called from
577 """setup all our socket connections to the cluster. This is called from
578 __init__."""
578 __init__."""
579
579
580 # Maybe allow reconnecting?
580 # Maybe allow reconnecting?
581 if self._connected:
581 if self._connected:
582 return
582 return
583 self._connected=True
583 self._connected=True
584
584
585 def connect_socket(s, url):
585 def connect_socket(s, url):
586 url = util.disambiguate_url(url, self._config['location'])
586 url = util.disambiguate_url(url, self._config['location'])
587 if self._ssh:
587 if self._ssh:
588 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
588 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
589 else:
589 else:
590 return s.connect(url)
590 return s.connect(url)
591
591
592 self.session.send(self._query_socket, 'connection_request')
592 self.session.send(self._query_socket, 'connection_request')
593 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
593 # use Poller because zmq.select has wrong units in pyzmq 2.1.7
594 poller = zmq.Poller()
594 poller = zmq.Poller()
595 poller.register(self._query_socket, zmq.POLLIN)
595 poller.register(self._query_socket, zmq.POLLIN)
596 # poll expects milliseconds, timeout is seconds
596 # poll expects milliseconds, timeout is seconds
597 evts = poller.poll(timeout*1000)
597 evts = poller.poll(timeout*1000)
598 if not evts:
598 if not evts:
599 raise error.TimeoutError("Hub connection request timed out")
599 raise error.TimeoutError("Hub connection request timed out")
600 idents,msg = self.session.recv(self._query_socket,mode=0)
600 idents,msg = self.session.recv(self._query_socket,mode=0)
601 if self.debug:
601 if self.debug:
602 pprint(msg)
602 pprint(msg)
603 msg = Message(msg)
603 msg = Message(msg)
604 content = msg.content
604 content = msg.content
605 self._config['registration'] = dict(content)
605 self._config['registration'] = dict(content)
606 if content.status == 'ok':
606 if content.status == 'ok':
607 ident = self.session.bsession
607 ident = self.session.bsession
608 if content.mux:
608 if content.mux:
609 self._mux_socket = self._context.socket(zmq.DEALER)
609 self._mux_socket = self._context.socket(zmq.DEALER)
610 self._mux_socket.setsockopt(zmq.IDENTITY, ident)
611 connect_socket(self._mux_socket, content.mux)
610 connect_socket(self._mux_socket, content.mux)
612 if content.task:
611 if content.task:
613 self._task_scheme, task_addr = content.task
612 self._task_scheme, task_addr = content.task
614 self._task_socket = self._context.socket(zmq.DEALER)
613 self._task_socket = self._context.socket(zmq.DEALER)
615 self._task_socket.setsockopt(zmq.IDENTITY, ident)
616 connect_socket(self._task_socket, task_addr)
614 connect_socket(self._task_socket, task_addr)
617 if content.notification:
615 if content.notification:
618 self._notification_socket = self._context.socket(zmq.SUB)
616 self._notification_socket = self._context.socket(zmq.SUB)
619 connect_socket(self._notification_socket, content.notification)
617 connect_socket(self._notification_socket, content.notification)
620 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
618 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
621 # if content.query:
622 # self._query_socket = self._context.socket(zmq.DEALER)
623 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.bsession)
624 # connect_socket(self._query_socket, content.query)
625 if content.control:
619 if content.control:
626 self._control_socket = self._context.socket(zmq.DEALER)
620 self._control_socket = self._context.socket(zmq.DEALER)
627 self._control_socket.setsockopt(zmq.IDENTITY, ident)
628 connect_socket(self._control_socket, content.control)
621 connect_socket(self._control_socket, content.control)
629 if content.iopub:
622 if content.iopub:
630 self._iopub_socket = self._context.socket(zmq.SUB)
623 self._iopub_socket = self._context.socket(zmq.SUB)
631 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
624 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
632 self._iopub_socket.setsockopt(zmq.IDENTITY, ident)
633 connect_socket(self._iopub_socket, content.iopub)
625 connect_socket(self._iopub_socket, content.iopub)
634 self._update_engines(dict(content.engines))
626 self._update_engines(dict(content.engines))
635 else:
627 else:
636 self._connected = False
628 self._connected = False
637 raise Exception("Failed to connect!")
629 raise Exception("Failed to connect!")
638
630
639 #--------------------------------------------------------------------------
631 #--------------------------------------------------------------------------
640 # handlers and callbacks for incoming messages
632 # handlers and callbacks for incoming messages
641 #--------------------------------------------------------------------------
633 #--------------------------------------------------------------------------
642
634
643 def _unwrap_exception(self, content):
635 def _unwrap_exception(self, content):
644 """unwrap exception, and remap engine_id to int."""
636 """unwrap exception, and remap engine_id to int."""
645 e = error.unwrap_exception(content)
637 e = error.unwrap_exception(content)
646 # print e.traceback
638 # print e.traceback
647 if e.engine_info:
639 if e.engine_info:
648 e_uuid = e.engine_info['engine_uuid']
640 e_uuid = e.engine_info['engine_uuid']
649 eid = self._engines[e_uuid]
641 eid = self._engines[e_uuid]
650 e.engine_info['engine_id'] = eid
642 e.engine_info['engine_id'] = eid
651 return e
643 return e
652
644
653 def _extract_metadata(self, header, parent, content):
645 def _extract_metadata(self, header, parent, content):
654 md = {'msg_id' : parent['msg_id'],
646 md = {'msg_id' : parent['msg_id'],
655 'received' : datetime.now(),
647 'received' : datetime.now(),
656 'engine_uuid' : header.get('engine', None),
648 'engine_uuid' : header.get('engine', None),
657 'follow' : parent.get('follow', []),
649 'follow' : parent.get('follow', []),
658 'after' : parent.get('after', []),
650 'after' : parent.get('after', []),
659 'status' : content['status'],
651 'status' : content['status'],
660 }
652 }
661
653
662 if md['engine_uuid'] is not None:
654 if md['engine_uuid'] is not None:
663 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
655 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
664
656
665 if 'date' in parent:
657 if 'date' in parent:
666 md['submitted'] = parent['date']
658 md['submitted'] = parent['date']
667 if 'started' in header:
659 if 'started' in header:
668 md['started'] = header['started']
660 md['started'] = header['started']
669 if 'date' in header:
661 if 'date' in header:
670 md['completed'] = header['date']
662 md['completed'] = header['date']
671 return md
663 return md
672
664
673 def _register_engine(self, msg):
665 def _register_engine(self, msg):
674 """Register a new engine, and update our connection info."""
666 """Register a new engine, and update our connection info."""
675 content = msg['content']
667 content = msg['content']
676 eid = content['id']
668 eid = content['id']
677 d = {eid : content['queue']}
669 d = {eid : content['queue']}
678 self._update_engines(d)
670 self._update_engines(d)
679
671
680 def _unregister_engine(self, msg):
672 def _unregister_engine(self, msg):
681 """Unregister an engine that has died."""
673 """Unregister an engine that has died."""
682 content = msg['content']
674 content = msg['content']
683 eid = int(content['id'])
675 eid = int(content['id'])
684 if eid in self._ids:
676 if eid in self._ids:
685 self._ids.remove(eid)
677 self._ids.remove(eid)
686 uuid = self._engines.pop(eid)
678 uuid = self._engines.pop(eid)
687
679
688 self._handle_stranded_msgs(eid, uuid)
680 self._handle_stranded_msgs(eid, uuid)
689
681
690 if self._task_socket and self._task_scheme == 'pure':
682 if self._task_socket and self._task_scheme == 'pure':
691 self._stop_scheduling_tasks()
683 self._stop_scheduling_tasks()
692
684
693 def _handle_stranded_msgs(self, eid, uuid):
685 def _handle_stranded_msgs(self, eid, uuid):
694 """Handle messages known to be on an engine when the engine unregisters.
686 """Handle messages known to be on an engine when the engine unregisters.
695
687
696 It is possible that this will fire prematurely - that is, an engine will
688 It is possible that this will fire prematurely - that is, an engine will
697 go down after completing a result, and the client will be notified
689 go down after completing a result, and the client will be notified
698 of the unregistration and later receive the successful result.
690 of the unregistration and later receive the successful result.
699 """
691 """
700
692
701 outstanding = self._outstanding_dict[uuid]
693 outstanding = self._outstanding_dict[uuid]
702
694
703 for msg_id in list(outstanding):
695 for msg_id in list(outstanding):
704 if msg_id in self.results:
696 if msg_id in self.results:
705 # we already
697 # we already
706 continue
698 continue
707 try:
699 try:
708 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
700 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
709 except:
701 except:
710 content = error.wrap_exception()
702 content = error.wrap_exception()
711 # build a fake message:
703 # build a fake message:
712 parent = {}
704 parent = {}
713 header = {}
705 header = {}
714 parent['msg_id'] = msg_id
706 parent['msg_id'] = msg_id
715 header['engine'] = uuid
707 header['engine'] = uuid
716 header['date'] = datetime.now()
708 header['date'] = datetime.now()
717 msg = dict(parent_header=parent, header=header, content=content)
709 msg = dict(parent_header=parent, header=header, content=content)
718 self._handle_apply_reply(msg)
710 self._handle_apply_reply(msg)
719
711
720 def _handle_execute_reply(self, msg):
712 def _handle_execute_reply(self, msg):
721 """Save the reply to an execute_request into our results.
713 """Save the reply to an execute_request into our results.
722
714
723 execute messages are never actually used. apply is used instead.
715 execute messages are never actually used. apply is used instead.
724 """
716 """
725
717
726 parent = msg['parent_header']
718 parent = msg['parent_header']
727 msg_id = parent['msg_id']
719 msg_id = parent['msg_id']
728 if msg_id not in self.outstanding:
720 if msg_id not in self.outstanding:
729 if msg_id in self.history:
721 if msg_id in self.history:
730 print ("got stale result: %s"%msg_id)
722 print ("got stale result: %s"%msg_id)
731 else:
723 else:
732 print ("got unknown result: %s"%msg_id)
724 print ("got unknown result: %s"%msg_id)
733 else:
725 else:
734 self.outstanding.remove(msg_id)
726 self.outstanding.remove(msg_id)
735
727
736 content = msg['content']
728 content = msg['content']
737 header = msg['header']
729 header = msg['header']
738
730
739 # construct metadata:
731 # construct metadata:
740 md = self.metadata[msg_id]
732 md = self.metadata[msg_id]
741 md.update(self._extract_metadata(header, parent, content))
733 md.update(self._extract_metadata(header, parent, content))
742 # is this redundant?
734 # is this redundant?
743 self.metadata[msg_id] = md
735 self.metadata[msg_id] = md
744
736
745 e_outstanding = self._outstanding_dict[md['engine_uuid']]
737 e_outstanding = self._outstanding_dict[md['engine_uuid']]
746 if msg_id in e_outstanding:
738 if msg_id in e_outstanding:
747 e_outstanding.remove(msg_id)
739 e_outstanding.remove(msg_id)
748
740
749 # construct result:
741 # construct result:
750 if content['status'] == 'ok':
742 if content['status'] == 'ok':
751 self.results[msg_id] = ExecuteReply(msg_id, content, md)
743 self.results[msg_id] = ExecuteReply(msg_id, content, md)
752 elif content['status'] == 'aborted':
744 elif content['status'] == 'aborted':
753 self.results[msg_id] = error.TaskAborted(msg_id)
745 self.results[msg_id] = error.TaskAborted(msg_id)
754 elif content['status'] == 'resubmitted':
746 elif content['status'] == 'resubmitted':
755 # TODO: handle resubmission
747 # TODO: handle resubmission
756 pass
748 pass
757 else:
749 else:
758 self.results[msg_id] = self._unwrap_exception(content)
750 self.results[msg_id] = self._unwrap_exception(content)
759
751
760 def _handle_apply_reply(self, msg):
752 def _handle_apply_reply(self, msg):
761 """Save the reply to an apply_request into our results."""
753 """Save the reply to an apply_request into our results."""
762 parent = msg['parent_header']
754 parent = msg['parent_header']
763 msg_id = parent['msg_id']
755 msg_id = parent['msg_id']
764 if msg_id not in self.outstanding:
756 if msg_id not in self.outstanding:
765 if msg_id in self.history:
757 if msg_id in self.history:
766 print ("got stale result: %s"%msg_id)
758 print ("got stale result: %s"%msg_id)
767 print self.results[msg_id]
759 print self.results[msg_id]
768 print msg
760 print msg
769 else:
761 else:
770 print ("got unknown result: %s"%msg_id)
762 print ("got unknown result: %s"%msg_id)
771 else:
763 else:
772 self.outstanding.remove(msg_id)
764 self.outstanding.remove(msg_id)
773 content = msg['content']
765 content = msg['content']
774 header = msg['header']
766 header = msg['header']
775
767
776 # construct metadata:
768 # construct metadata:
777 md = self.metadata[msg_id]
769 md = self.metadata[msg_id]
778 md.update(self._extract_metadata(header, parent, content))
770 md.update(self._extract_metadata(header, parent, content))
779 # is this redundant?
771 # is this redundant?
780 self.metadata[msg_id] = md
772 self.metadata[msg_id] = md
781
773
782 e_outstanding = self._outstanding_dict[md['engine_uuid']]
774 e_outstanding = self._outstanding_dict[md['engine_uuid']]
783 if msg_id in e_outstanding:
775 if msg_id in e_outstanding:
784 e_outstanding.remove(msg_id)
776 e_outstanding.remove(msg_id)
785
777
786 # construct result:
778 # construct result:
787 if content['status'] == 'ok':
779 if content['status'] == 'ok':
788 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
780 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
789 elif content['status'] == 'aborted':
781 elif content['status'] == 'aborted':
790 self.results[msg_id] = error.TaskAborted(msg_id)
782 self.results[msg_id] = error.TaskAborted(msg_id)
791 elif content['status'] == 'resubmitted':
783 elif content['status'] == 'resubmitted':
792 # TODO: handle resubmission
784 # TODO: handle resubmission
793 pass
785 pass
794 else:
786 else:
795 self.results[msg_id] = self._unwrap_exception(content)
787 self.results[msg_id] = self._unwrap_exception(content)
796
788
797 def _flush_notifications(self):
789 def _flush_notifications(self):
798 """Flush notifications of engine registrations waiting
790 """Flush notifications of engine registrations waiting
799 in ZMQ queue."""
791 in ZMQ queue."""
800 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
792 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
801 while msg is not None:
793 while msg is not None:
802 if self.debug:
794 if self.debug:
803 pprint(msg)
795 pprint(msg)
804 msg_type = msg['header']['msg_type']
796 msg_type = msg['header']['msg_type']
805 handler = self._notification_handlers.get(msg_type, None)
797 handler = self._notification_handlers.get(msg_type, None)
806 if handler is None:
798 if handler is None:
807 raise Exception("Unhandled message type: %s"%msg.msg_type)
799 raise Exception("Unhandled message type: %s"%msg.msg_type)
808 else:
800 else:
809 handler(msg)
801 handler(msg)
810 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
802 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
811
803
812 def _flush_results(self, sock):
804 def _flush_results(self, sock):
813 """Flush task or queue results waiting in ZMQ queue."""
805 """Flush task or queue results waiting in ZMQ queue."""
814 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
806 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
815 while msg is not None:
807 while msg is not None:
816 if self.debug:
808 if self.debug:
817 pprint(msg)
809 pprint(msg)
818 msg_type = msg['header']['msg_type']
810 msg_type = msg['header']['msg_type']
819 handler = self._queue_handlers.get(msg_type, None)
811 handler = self._queue_handlers.get(msg_type, None)
820 if handler is None:
812 if handler is None:
821 raise Exception("Unhandled message type: %s"%msg.msg_type)
813 raise Exception("Unhandled message type: %s"%msg.msg_type)
822 else:
814 else:
823 handler(msg)
815 handler(msg)
824 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
816 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
825
817
826 def _flush_control(self, sock):
818 def _flush_control(self, sock):
827 """Flush replies from the control channel waiting
819 """Flush replies from the control channel waiting
828 in the ZMQ queue.
820 in the ZMQ queue.
829
821
830 Currently: ignore them."""
822 Currently: ignore them."""
831 if self._ignored_control_replies <= 0:
823 if self._ignored_control_replies <= 0:
832 return
824 return
833 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
825 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
834 while msg is not None:
826 while msg is not None:
835 self._ignored_control_replies -= 1
827 self._ignored_control_replies -= 1
836 if self.debug:
828 if self.debug:
837 pprint(msg)
829 pprint(msg)
838 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
830 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
839
831
840 def _flush_ignored_control(self):
832 def _flush_ignored_control(self):
841 """flush ignored control replies"""
833 """flush ignored control replies"""
842 while self._ignored_control_replies > 0:
834 while self._ignored_control_replies > 0:
843 self.session.recv(self._control_socket)
835 self.session.recv(self._control_socket)
844 self._ignored_control_replies -= 1
836 self._ignored_control_replies -= 1
845
837
846 def _flush_ignored_hub_replies(self):
838 def _flush_ignored_hub_replies(self):
847 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
839 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
848 while msg is not None:
840 while msg is not None:
849 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
841 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
850
842
851 def _flush_iopub(self, sock):
843 def _flush_iopub(self, sock):
852 """Flush replies from the iopub channel waiting
844 """Flush replies from the iopub channel waiting
853 in the ZMQ queue.
845 in the ZMQ queue.
854 """
846 """
855 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
847 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
856 while msg is not None:
848 while msg is not None:
857 if self.debug:
849 if self.debug:
858 pprint(msg)
850 pprint(msg)
859 parent = msg['parent_header']
851 parent = msg['parent_header']
860 # ignore IOPub messages with no parent.
852 # ignore IOPub messages with no parent.
861 # Caused by print statements or warnings from before the first execution.
853 # Caused by print statements or warnings from before the first execution.
862 if not parent:
854 if not parent:
863 continue
855 continue
864 msg_id = parent['msg_id']
856 msg_id = parent['msg_id']
865 content = msg['content']
857 content = msg['content']
866 header = msg['header']
858 header = msg['header']
867 msg_type = msg['header']['msg_type']
859 msg_type = msg['header']['msg_type']
868
860
869 # init metadata:
861 # init metadata:
870 md = self.metadata[msg_id]
862 md = self.metadata[msg_id]
871
863
872 if msg_type == 'stream':
864 if msg_type == 'stream':
873 name = content['name']
865 name = content['name']
874 s = md[name] or ''
866 s = md[name] or ''
875 md[name] = s + content['data']
867 md[name] = s + content['data']
876 elif msg_type == 'pyerr':
868 elif msg_type == 'pyerr':
877 md.update({'pyerr' : self._unwrap_exception(content)})
869 md.update({'pyerr' : self._unwrap_exception(content)})
878 elif msg_type == 'pyin':
870 elif msg_type == 'pyin':
879 md.update({'pyin' : content['code']})
871 md.update({'pyin' : content['code']})
880 elif msg_type == 'display_data':
872 elif msg_type == 'display_data':
881 md['outputs'].append(content)
873 md['outputs'].append(content)
882 elif msg_type == 'pyout':
874 elif msg_type == 'pyout':
883 md['pyout'] = content
875 md['pyout'] = content
884 elif msg_type == 'status':
876 elif msg_type == 'status':
885 # idle message comes after all outputs
877 # idle message comes after all outputs
886 if content['execution_state'] == 'idle':
878 if content['execution_state'] == 'idle':
887 md['outputs_ready'] = True
879 md['outputs_ready'] = True
888 else:
880 else:
889 # unhandled msg_type (status, etc.)
881 # unhandled msg_type (status, etc.)
890 pass
882 pass
891
883
892 # reduntant?
884 # reduntant?
893 self.metadata[msg_id] = md
885 self.metadata[msg_id] = md
894
886
895 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
887 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
896
888
897 #--------------------------------------------------------------------------
889 #--------------------------------------------------------------------------
898 # len, getitem
890 # len, getitem
899 #--------------------------------------------------------------------------
891 #--------------------------------------------------------------------------
900
892
901 def __len__(self):
893 def __len__(self):
902 """len(client) returns # of engines."""
894 """len(client) returns # of engines."""
903 return len(self.ids)
895 return len(self.ids)
904
896
905 def __getitem__(self, key):
897 def __getitem__(self, key):
906 """index access returns DirectView multiplexer objects
898 """index access returns DirectView multiplexer objects
907
899
908 Must be int, slice, or list/tuple/xrange of ints"""
900 Must be int, slice, or list/tuple/xrange of ints"""
909 if not isinstance(key, (int, slice, tuple, list, xrange)):
901 if not isinstance(key, (int, slice, tuple, list, xrange)):
910 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
902 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
911 else:
903 else:
912 return self.direct_view(key)
904 return self.direct_view(key)
913
905
914 #--------------------------------------------------------------------------
906 #--------------------------------------------------------------------------
915 # Begin public methods
907 # Begin public methods
916 #--------------------------------------------------------------------------
908 #--------------------------------------------------------------------------
917
909
918 @property
910 @property
919 def ids(self):
911 def ids(self):
920 """Always up-to-date ids property."""
912 """Always up-to-date ids property."""
921 self._flush_notifications()
913 self._flush_notifications()
922 # always copy:
914 # always copy:
923 return list(self._ids)
915 return list(self._ids)
924
916
925 def activate(self, targets='all', suffix=''):
917 def activate(self, targets='all', suffix=''):
926 """Create a DirectView and register it with IPython magics
918 """Create a DirectView and register it with IPython magics
927
919
928 Defines the magics `%px, %autopx, %pxresult, %%px`
920 Defines the magics `%px, %autopx, %pxresult, %%px`
929
921
930 Parameters
922 Parameters
931 ----------
923 ----------
932
924
933 targets: int, list of ints, or 'all'
925 targets: int, list of ints, or 'all'
934 The engines on which the view's magics will run
926 The engines on which the view's magics will run
935 suffix: str [default: '']
927 suffix: str [default: '']
936 The suffix, if any, for the magics. This allows you to have
928 The suffix, if any, for the magics. This allows you to have
937 multiple views associated with parallel magics at the same time.
929 multiple views associated with parallel magics at the same time.
938
930
939 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
931 e.g. ``rc.activate(targets=0, suffix='0')`` will give you
940 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
932 the magics ``%px0``, ``%pxresult0``, etc. for running magics just
941 on engine 0.
933 on engine 0.
942 """
934 """
943 view = self.direct_view(targets)
935 view = self.direct_view(targets)
944 view.block = True
936 view.block = True
945 view.activate(suffix)
937 view.activate(suffix)
946 return view
938 return view
947
939
948 def close(self):
940 def close(self):
949 if self._closed:
941 if self._closed:
950 return
942 return
951 self.stop_spin_thread()
943 self.stop_spin_thread()
952 snames = filter(lambda n: n.endswith('socket'), dir(self))
944 snames = filter(lambda n: n.endswith('socket'), dir(self))
953 for socket in map(lambda name: getattr(self, name), snames):
945 for socket in map(lambda name: getattr(self, name), snames):
954 if isinstance(socket, zmq.Socket) and not socket.closed:
946 if isinstance(socket, zmq.Socket) and not socket.closed:
955 socket.close()
947 socket.close()
956 self._closed = True
948 self._closed = True
957
949
958 def _spin_every(self, interval=1):
950 def _spin_every(self, interval=1):
959 """target func for use in spin_thread"""
951 """target func for use in spin_thread"""
960 while True:
952 while True:
961 if self._stop_spinning.is_set():
953 if self._stop_spinning.is_set():
962 return
954 return
963 time.sleep(interval)
955 time.sleep(interval)
964 self.spin()
956 self.spin()
965
957
966 def spin_thread(self, interval=1):
958 def spin_thread(self, interval=1):
967 """call Client.spin() in a background thread on some regular interval
959 """call Client.spin() in a background thread on some regular interval
968
960
969 This helps ensure that messages don't pile up too much in the zmq queue
961 This helps ensure that messages don't pile up too much in the zmq queue
970 while you are working on other things, or just leaving an idle terminal.
962 while you are working on other things, or just leaving an idle terminal.
971
963
972 It also helps limit potential padding of the `received` timestamp
964 It also helps limit potential padding of the `received` timestamp
973 on AsyncResult objects, used for timings.
965 on AsyncResult objects, used for timings.
974
966
975 Parameters
967 Parameters
976 ----------
968 ----------
977
969
978 interval : float, optional
970 interval : float, optional
979 The interval on which to spin the client in the background thread
971 The interval on which to spin the client in the background thread
980 (simply passed to time.sleep).
972 (simply passed to time.sleep).
981
973
982 Notes
974 Notes
983 -----
975 -----
984
976
985 For precision timing, you may want to use this method to put a bound
977 For precision timing, you may want to use this method to put a bound
986 on the jitter (in seconds) in `received` timestamps used
978 on the jitter (in seconds) in `received` timestamps used
987 in AsyncResult.wall_time.
979 in AsyncResult.wall_time.
988
980
989 """
981 """
990 if self._spin_thread is not None:
982 if self._spin_thread is not None:
991 self.stop_spin_thread()
983 self.stop_spin_thread()
992 self._stop_spinning.clear()
984 self._stop_spinning.clear()
993 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
985 self._spin_thread = Thread(target=self._spin_every, args=(interval,))
994 self._spin_thread.daemon = True
986 self._spin_thread.daemon = True
995 self._spin_thread.start()
987 self._spin_thread.start()
996
988
997 def stop_spin_thread(self):
989 def stop_spin_thread(self):
998 """stop background spin_thread, if any"""
990 """stop background spin_thread, if any"""
999 if self._spin_thread is not None:
991 if self._spin_thread is not None:
1000 self._stop_spinning.set()
992 self._stop_spinning.set()
1001 self._spin_thread.join()
993 self._spin_thread.join()
1002 self._spin_thread = None
994 self._spin_thread = None
1003
995
1004 def spin(self):
996 def spin(self):
1005 """Flush any registration notifications and execution results
997 """Flush any registration notifications and execution results
1006 waiting in the ZMQ queue.
998 waiting in the ZMQ queue.
1007 """
999 """
1008 if self._notification_socket:
1000 if self._notification_socket:
1009 self._flush_notifications()
1001 self._flush_notifications()
1010 if self._iopub_socket:
1002 if self._iopub_socket:
1011 self._flush_iopub(self._iopub_socket)
1003 self._flush_iopub(self._iopub_socket)
1012 if self._mux_socket:
1004 if self._mux_socket:
1013 self._flush_results(self._mux_socket)
1005 self._flush_results(self._mux_socket)
1014 if self._task_socket:
1006 if self._task_socket:
1015 self._flush_results(self._task_socket)
1007 self._flush_results(self._task_socket)
1016 if self._control_socket:
1008 if self._control_socket:
1017 self._flush_control(self._control_socket)
1009 self._flush_control(self._control_socket)
1018 if self._query_socket:
1010 if self._query_socket:
1019 self._flush_ignored_hub_replies()
1011 self._flush_ignored_hub_replies()
1020
1012
1021 def wait(self, jobs=None, timeout=-1):
1013 def wait(self, jobs=None, timeout=-1):
1022 """waits on one or more `jobs`, for up to `timeout` seconds.
1014 """waits on one or more `jobs`, for up to `timeout` seconds.
1023
1015
1024 Parameters
1016 Parameters
1025 ----------
1017 ----------
1026
1018
1027 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1019 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
1028 ints are indices to self.history
1020 ints are indices to self.history
1029 strs are msg_ids
1021 strs are msg_ids
1030 default: wait on all outstanding messages
1022 default: wait on all outstanding messages
1031 timeout : float
1023 timeout : float
1032 a time in seconds, after which to give up.
1024 a time in seconds, after which to give up.
1033 default is -1, which means no timeout
1025 default is -1, which means no timeout
1034
1026
1035 Returns
1027 Returns
1036 -------
1028 -------
1037
1029
1038 True : when all msg_ids are done
1030 True : when all msg_ids are done
1039 False : timeout reached, some msg_ids still outstanding
1031 False : timeout reached, some msg_ids still outstanding
1040 """
1032 """
1041 tic = time.time()
1033 tic = time.time()
1042 if jobs is None:
1034 if jobs is None:
1043 theids = self.outstanding
1035 theids = self.outstanding
1044 else:
1036 else:
1045 if isinstance(jobs, (int, basestring, AsyncResult)):
1037 if isinstance(jobs, (int, basestring, AsyncResult)):
1046 jobs = [jobs]
1038 jobs = [jobs]
1047 theids = set()
1039 theids = set()
1048 for job in jobs:
1040 for job in jobs:
1049 if isinstance(job, int):
1041 if isinstance(job, int):
1050 # index access
1042 # index access
1051 job = self.history[job]
1043 job = self.history[job]
1052 elif isinstance(job, AsyncResult):
1044 elif isinstance(job, AsyncResult):
1053 map(theids.add, job.msg_ids)
1045 map(theids.add, job.msg_ids)
1054 continue
1046 continue
1055 theids.add(job)
1047 theids.add(job)
1056 if not theids.intersection(self.outstanding):
1048 if not theids.intersection(self.outstanding):
1057 return True
1049 return True
1058 self.spin()
1050 self.spin()
1059 while theids.intersection(self.outstanding):
1051 while theids.intersection(self.outstanding):
1060 if timeout >= 0 and ( time.time()-tic ) > timeout:
1052 if timeout >= 0 and ( time.time()-tic ) > timeout:
1061 break
1053 break
1062 time.sleep(1e-3)
1054 time.sleep(1e-3)
1063 self.spin()
1055 self.spin()
1064 return len(theids.intersection(self.outstanding)) == 0
1056 return len(theids.intersection(self.outstanding)) == 0
1065
1057
1066 #--------------------------------------------------------------------------
1058 #--------------------------------------------------------------------------
1067 # Control methods
1059 # Control methods
1068 #--------------------------------------------------------------------------
1060 #--------------------------------------------------------------------------
1069
1061
1070 @spin_first
1062 @spin_first
1071 def clear(self, targets=None, block=None):
1063 def clear(self, targets=None, block=None):
1072 """Clear the namespace in target(s)."""
1064 """Clear the namespace in target(s)."""
1073 block = self.block if block is None else block
1065 block = self.block if block is None else block
1074 targets = self._build_targets(targets)[0]
1066 targets = self._build_targets(targets)[0]
1075 for t in targets:
1067 for t in targets:
1076 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1068 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
1077 error = False
1069 error = False
1078 if block:
1070 if block:
1079 self._flush_ignored_control()
1071 self._flush_ignored_control()
1080 for i in range(len(targets)):
1072 for i in range(len(targets)):
1081 idents,msg = self.session.recv(self._control_socket,0)
1073 idents,msg = self.session.recv(self._control_socket,0)
1082 if self.debug:
1074 if self.debug:
1083 pprint(msg)
1075 pprint(msg)
1084 if msg['content']['status'] != 'ok':
1076 if msg['content']['status'] != 'ok':
1085 error = self._unwrap_exception(msg['content'])
1077 error = self._unwrap_exception(msg['content'])
1086 else:
1078 else:
1087 self._ignored_control_replies += len(targets)
1079 self._ignored_control_replies += len(targets)
1088 if error:
1080 if error:
1089 raise error
1081 raise error
1090
1082
1091
1083
1092 @spin_first
1084 @spin_first
1093 def abort(self, jobs=None, targets=None, block=None):
1085 def abort(self, jobs=None, targets=None, block=None):
1094 """Abort specific jobs from the execution queues of target(s).
1086 """Abort specific jobs from the execution queues of target(s).
1095
1087
1096 This is a mechanism to prevent jobs that have already been submitted
1088 This is a mechanism to prevent jobs that have already been submitted
1097 from executing.
1089 from executing.
1098
1090
1099 Parameters
1091 Parameters
1100 ----------
1092 ----------
1101
1093
1102 jobs : msg_id, list of msg_ids, or AsyncResult
1094 jobs : msg_id, list of msg_ids, or AsyncResult
1103 The jobs to be aborted
1095 The jobs to be aborted
1104
1096
1105 If unspecified/None: abort all outstanding jobs.
1097 If unspecified/None: abort all outstanding jobs.
1106
1098
1107 """
1099 """
1108 block = self.block if block is None else block
1100 block = self.block if block is None else block
1109 jobs = jobs if jobs is not None else list(self.outstanding)
1101 jobs = jobs if jobs is not None else list(self.outstanding)
1110 targets = self._build_targets(targets)[0]
1102 targets = self._build_targets(targets)[0]
1111
1103
1112 msg_ids = []
1104 msg_ids = []
1113 if isinstance(jobs, (basestring,AsyncResult)):
1105 if isinstance(jobs, (basestring,AsyncResult)):
1114 jobs = [jobs]
1106 jobs = [jobs]
1115 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1107 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1116 if bad_ids:
1108 if bad_ids:
1117 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1109 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1118 for j in jobs:
1110 for j in jobs:
1119 if isinstance(j, AsyncResult):
1111 if isinstance(j, AsyncResult):
1120 msg_ids.extend(j.msg_ids)
1112 msg_ids.extend(j.msg_ids)
1121 else:
1113 else:
1122 msg_ids.append(j)
1114 msg_ids.append(j)
1123 content = dict(msg_ids=msg_ids)
1115 content = dict(msg_ids=msg_ids)
1124 for t in targets:
1116 for t in targets:
1125 self.session.send(self._control_socket, 'abort_request',
1117 self.session.send(self._control_socket, 'abort_request',
1126 content=content, ident=t)
1118 content=content, ident=t)
1127 error = False
1119 error = False
1128 if block:
1120 if block:
1129 self._flush_ignored_control()
1121 self._flush_ignored_control()
1130 for i in range(len(targets)):
1122 for i in range(len(targets)):
1131 idents,msg = self.session.recv(self._control_socket,0)
1123 idents,msg = self.session.recv(self._control_socket,0)
1132 if self.debug:
1124 if self.debug:
1133 pprint(msg)
1125 pprint(msg)
1134 if msg['content']['status'] != 'ok':
1126 if msg['content']['status'] != 'ok':
1135 error = self._unwrap_exception(msg['content'])
1127 error = self._unwrap_exception(msg['content'])
1136 else:
1128 else:
1137 self._ignored_control_replies += len(targets)
1129 self._ignored_control_replies += len(targets)
1138 if error:
1130 if error:
1139 raise error
1131 raise error
1140
1132
1141 @spin_first
1133 @spin_first
1142 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1134 def shutdown(self, targets='all', restart=False, hub=False, block=None):
1143 """Terminates one or more engine processes, optionally including the hub.
1135 """Terminates one or more engine processes, optionally including the hub.
1144
1136
1145 Parameters
1137 Parameters
1146 ----------
1138 ----------
1147
1139
1148 targets: list of ints or 'all' [default: all]
1140 targets: list of ints or 'all' [default: all]
1149 Which engines to shutdown.
1141 Which engines to shutdown.
1150 hub: bool [default: False]
1142 hub: bool [default: False]
1151 Whether to include the Hub. hub=True implies targets='all'.
1143 Whether to include the Hub. hub=True implies targets='all'.
1152 block: bool [default: self.block]
1144 block: bool [default: self.block]
1153 Whether to wait for clean shutdown replies or not.
1145 Whether to wait for clean shutdown replies or not.
1154 restart: bool [default: False]
1146 restart: bool [default: False]
1155 NOT IMPLEMENTED
1147 NOT IMPLEMENTED
1156 whether to restart engines after shutting them down.
1148 whether to restart engines after shutting them down.
1157 """
1149 """
1158
1150
1159 if restart:
1151 if restart:
1160 raise NotImplementedError("Engine restart is not yet implemented")
1152 raise NotImplementedError("Engine restart is not yet implemented")
1161
1153
1162 block = self.block if block is None else block
1154 block = self.block if block is None else block
1163 if hub:
1155 if hub:
1164 targets = 'all'
1156 targets = 'all'
1165 targets = self._build_targets(targets)[0]
1157 targets = self._build_targets(targets)[0]
1166 for t in targets:
1158 for t in targets:
1167 self.session.send(self._control_socket, 'shutdown_request',
1159 self.session.send(self._control_socket, 'shutdown_request',
1168 content={'restart':restart},ident=t)
1160 content={'restart':restart},ident=t)
1169 error = False
1161 error = False
1170 if block or hub:
1162 if block or hub:
1171 self._flush_ignored_control()
1163 self._flush_ignored_control()
1172 for i in range(len(targets)):
1164 for i in range(len(targets)):
1173 idents,msg = self.session.recv(self._control_socket, 0)
1165 idents,msg = self.session.recv(self._control_socket, 0)
1174 if self.debug:
1166 if self.debug:
1175 pprint(msg)
1167 pprint(msg)
1176 if msg['content']['status'] != 'ok':
1168 if msg['content']['status'] != 'ok':
1177 error = self._unwrap_exception(msg['content'])
1169 error = self._unwrap_exception(msg['content'])
1178 else:
1170 else:
1179 self._ignored_control_replies += len(targets)
1171 self._ignored_control_replies += len(targets)
1180
1172
1181 if hub:
1173 if hub:
1182 time.sleep(0.25)
1174 time.sleep(0.25)
1183 self.session.send(self._query_socket, 'shutdown_request')
1175 self.session.send(self._query_socket, 'shutdown_request')
1184 idents,msg = self.session.recv(self._query_socket, 0)
1176 idents,msg = self.session.recv(self._query_socket, 0)
1185 if self.debug:
1177 if self.debug:
1186 pprint(msg)
1178 pprint(msg)
1187 if msg['content']['status'] != 'ok':
1179 if msg['content']['status'] != 'ok':
1188 error = self._unwrap_exception(msg['content'])
1180 error = self._unwrap_exception(msg['content'])
1189
1181
1190 if error:
1182 if error:
1191 raise error
1183 raise error
1192
1184
1193 #--------------------------------------------------------------------------
1185 #--------------------------------------------------------------------------
1194 # Execution related methods
1186 # Execution related methods
1195 #--------------------------------------------------------------------------
1187 #--------------------------------------------------------------------------
1196
1188
1197 def _maybe_raise(self, result):
1189 def _maybe_raise(self, result):
1198 """wrapper for maybe raising an exception if apply failed."""
1190 """wrapper for maybe raising an exception if apply failed."""
1199 if isinstance(result, error.RemoteError):
1191 if isinstance(result, error.RemoteError):
1200 raise result
1192 raise result
1201
1193
1202 return result
1194 return result
1203
1195
1204 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1196 def send_apply_request(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
1205 ident=None):
1197 ident=None):
1206 """construct and send an apply message via a socket.
1198 """construct and send an apply message via a socket.
1207
1199
1208 This is the principal method with which all engine execution is performed by views.
1200 This is the principal method with which all engine execution is performed by views.
1209 """
1201 """
1210
1202
1211 if self._closed:
1203 if self._closed:
1212 raise RuntimeError("Client cannot be used after its sockets have been closed")
1204 raise RuntimeError("Client cannot be used after its sockets have been closed")
1213
1205
1214 # defaults:
1206 # defaults:
1215 args = args if args is not None else []
1207 args = args if args is not None else []
1216 kwargs = kwargs if kwargs is not None else {}
1208 kwargs = kwargs if kwargs is not None else {}
1217 subheader = subheader if subheader is not None else {}
1209 subheader = subheader if subheader is not None else {}
1218
1210
1219 # validate arguments
1211 # validate arguments
1220 if not callable(f) and not isinstance(f, Reference):
1212 if not callable(f) and not isinstance(f, Reference):
1221 raise TypeError("f must be callable, not %s"%type(f))
1213 raise TypeError("f must be callable, not %s"%type(f))
1222 if not isinstance(args, (tuple, list)):
1214 if not isinstance(args, (tuple, list)):
1223 raise TypeError("args must be tuple or list, not %s"%type(args))
1215 raise TypeError("args must be tuple or list, not %s"%type(args))
1224 if not isinstance(kwargs, dict):
1216 if not isinstance(kwargs, dict):
1225 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1217 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1226 if not isinstance(subheader, dict):
1218 if not isinstance(subheader, dict):
1227 raise TypeError("subheader must be dict, not %s"%type(subheader))
1219 raise TypeError("subheader must be dict, not %s"%type(subheader))
1228
1220
1229 bufs = util.pack_apply_message(f,args,kwargs)
1221 bufs = util.pack_apply_message(f,args,kwargs)
1230
1222
1231 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1223 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
1232 subheader=subheader, track=track)
1224 subheader=subheader, track=track)
1233
1225
1234 msg_id = msg['header']['msg_id']
1226 msg_id = msg['header']['msg_id']
1235 self.outstanding.add(msg_id)
1227 self.outstanding.add(msg_id)
1236 if ident:
1228 if ident:
1237 # possibly routed to a specific engine
1229 # possibly routed to a specific engine
1238 if isinstance(ident, list):
1230 if isinstance(ident, list):
1239 ident = ident[-1]
1231 ident = ident[-1]
1240 if ident in self._engines.values():
1232 if ident in self._engines.values():
1241 # save for later, in case of engine death
1233 # save for later, in case of engine death
1242 self._outstanding_dict[ident].add(msg_id)
1234 self._outstanding_dict[ident].add(msg_id)
1243 self.history.append(msg_id)
1235 self.history.append(msg_id)
1244 self.metadata[msg_id]['submitted'] = datetime.now()
1236 self.metadata[msg_id]['submitted'] = datetime.now()
1245
1237
1246 return msg
1238 return msg
1247
1239
1248 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1240 def send_execute_request(self, socket, code, silent=True, subheader=None, ident=None):
1249 """construct and send an execute request via a socket.
1241 """construct and send an execute request via a socket.
1250
1242
1251 """
1243 """
1252
1244
1253 if self._closed:
1245 if self._closed:
1254 raise RuntimeError("Client cannot be used after its sockets have been closed")
1246 raise RuntimeError("Client cannot be used after its sockets have been closed")
1255
1247
1256 # defaults:
1248 # defaults:
1257 subheader = subheader if subheader is not None else {}
1249 subheader = subheader if subheader is not None else {}
1258
1250
1259 # validate arguments
1251 # validate arguments
1260 if not isinstance(code, basestring):
1252 if not isinstance(code, basestring):
1261 raise TypeError("code must be text, not %s" % type(code))
1253 raise TypeError("code must be text, not %s" % type(code))
1262 if not isinstance(subheader, dict):
1254 if not isinstance(subheader, dict):
1263 raise TypeError("subheader must be dict, not %s" % type(subheader))
1255 raise TypeError("subheader must be dict, not %s" % type(subheader))
1264
1256
1265 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1257 content = dict(code=code, silent=bool(silent), user_variables=[], user_expressions={})
1266
1258
1267
1259
1268 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1260 msg = self.session.send(socket, "execute_request", content=content, ident=ident,
1269 subheader=subheader)
1261 subheader=subheader)
1270
1262
1271 msg_id = msg['header']['msg_id']
1263 msg_id = msg['header']['msg_id']
1272 self.outstanding.add(msg_id)
1264 self.outstanding.add(msg_id)
1273 if ident:
1265 if ident:
1274 # possibly routed to a specific engine
1266 # possibly routed to a specific engine
1275 if isinstance(ident, list):
1267 if isinstance(ident, list):
1276 ident = ident[-1]
1268 ident = ident[-1]
1277 if ident in self._engines.values():
1269 if ident in self._engines.values():
1278 # save for later, in case of engine death
1270 # save for later, in case of engine death
1279 self._outstanding_dict[ident].add(msg_id)
1271 self._outstanding_dict[ident].add(msg_id)
1280 self.history.append(msg_id)
1272 self.history.append(msg_id)
1281 self.metadata[msg_id]['submitted'] = datetime.now()
1273 self.metadata[msg_id]['submitted'] = datetime.now()
1282
1274
1283 return msg
1275 return msg
1284
1276
1285 #--------------------------------------------------------------------------
1277 #--------------------------------------------------------------------------
1286 # construct a View object
1278 # construct a View object
1287 #--------------------------------------------------------------------------
1279 #--------------------------------------------------------------------------
1288
1280
1289 def load_balanced_view(self, targets=None):
1281 def load_balanced_view(self, targets=None):
1290 """construct a DirectView object.
1282 """construct a DirectView object.
1291
1283
1292 If no arguments are specified, create a LoadBalancedView
1284 If no arguments are specified, create a LoadBalancedView
1293 using all engines.
1285 using all engines.
1294
1286
1295 Parameters
1287 Parameters
1296 ----------
1288 ----------
1297
1289
1298 targets: list,slice,int,etc. [default: use all engines]
1290 targets: list,slice,int,etc. [default: use all engines]
1299 The subset of engines across which to load-balance
1291 The subset of engines across which to load-balance
1300 """
1292 """
1301 if targets == 'all':
1293 if targets == 'all':
1302 targets = None
1294 targets = None
1303 if targets is not None:
1295 if targets is not None:
1304 targets = self._build_targets(targets)[1]
1296 targets = self._build_targets(targets)[1]
1305 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1297 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
1306
1298
1307 def direct_view(self, targets='all'):
1299 def direct_view(self, targets='all'):
1308 """construct a DirectView object.
1300 """construct a DirectView object.
1309
1301
1310 If no targets are specified, create a DirectView using all engines.
1302 If no targets are specified, create a DirectView using all engines.
1311
1303
1312 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1304 rc.direct_view('all') is distinguished from rc[:] in that 'all' will
1313 evaluate the target engines at each execution, whereas rc[:] will connect to
1305 evaluate the target engines at each execution, whereas rc[:] will connect to
1314 all *current* engines, and that list will not change.
1306 all *current* engines, and that list will not change.
1315
1307
1316 That is, 'all' will always use all engines, whereas rc[:] will not use
1308 That is, 'all' will always use all engines, whereas rc[:] will not use
1317 engines added after the DirectView is constructed.
1309 engines added after the DirectView is constructed.
1318
1310
1319 Parameters
1311 Parameters
1320 ----------
1312 ----------
1321
1313
1322 targets: list,slice,int,etc. [default: use all engines]
1314 targets: list,slice,int,etc. [default: use all engines]
1323 The engines to use for the View
1315 The engines to use for the View
1324 """
1316 """
1325 single = isinstance(targets, int)
1317 single = isinstance(targets, int)
1326 # allow 'all' to be lazily evaluated at each execution
1318 # allow 'all' to be lazily evaluated at each execution
1327 if targets != 'all':
1319 if targets != 'all':
1328 targets = self._build_targets(targets)[1]
1320 targets = self._build_targets(targets)[1]
1329 if single:
1321 if single:
1330 targets = targets[0]
1322 targets = targets[0]
1331 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1323 return DirectView(client=self, socket=self._mux_socket, targets=targets)
1332
1324
1333 #--------------------------------------------------------------------------
1325 #--------------------------------------------------------------------------
1334 # Query methods
1326 # Query methods
1335 #--------------------------------------------------------------------------
1327 #--------------------------------------------------------------------------
1336
1328
1337 @spin_first
1329 @spin_first
1338 def get_result(self, indices_or_msg_ids=None, block=None):
1330 def get_result(self, indices_or_msg_ids=None, block=None):
1339 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1331 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1340
1332
1341 If the client already has the results, no request to the Hub will be made.
1333 If the client already has the results, no request to the Hub will be made.
1342
1334
1343 This is a convenient way to construct AsyncResult objects, which are wrappers
1335 This is a convenient way to construct AsyncResult objects, which are wrappers
1344 that include metadata about execution, and allow for awaiting results that
1336 that include metadata about execution, and allow for awaiting results that
1345 were not submitted by this Client.
1337 were not submitted by this Client.
1346
1338
1347 It can also be a convenient way to retrieve the metadata associated with
1339 It can also be a convenient way to retrieve the metadata associated with
1348 blocking execution, since it always retrieves
1340 blocking execution, since it always retrieves
1349
1341
1350 Examples
1342 Examples
1351 --------
1343 --------
1352 ::
1344 ::
1353
1345
1354 In [10]: r = client.apply()
1346 In [10]: r = client.apply()
1355
1347
1356 Parameters
1348 Parameters
1357 ----------
1349 ----------
1358
1350
1359 indices_or_msg_ids : integer history index, str msg_id, or list of either
1351 indices_or_msg_ids : integer history index, str msg_id, or list of either
1360 The indices or msg_ids of indices to be retrieved
1352 The indices or msg_ids of indices to be retrieved
1361
1353
1362 block : bool
1354 block : bool
1363 Whether to wait for the result to be done
1355 Whether to wait for the result to be done
1364
1356
1365 Returns
1357 Returns
1366 -------
1358 -------
1367
1359
1368 AsyncResult
1360 AsyncResult
1369 A single AsyncResult object will always be returned.
1361 A single AsyncResult object will always be returned.
1370
1362
1371 AsyncHubResult
1363 AsyncHubResult
1372 A subclass of AsyncResult that retrieves results from the Hub
1364 A subclass of AsyncResult that retrieves results from the Hub
1373
1365
1374 """
1366 """
1375 block = self.block if block is None else block
1367 block = self.block if block is None else block
1376 if indices_or_msg_ids is None:
1368 if indices_or_msg_ids is None:
1377 indices_or_msg_ids = -1
1369 indices_or_msg_ids = -1
1378
1370
1379 if not isinstance(indices_or_msg_ids, (list,tuple)):
1371 if not isinstance(indices_or_msg_ids, (list,tuple)):
1380 indices_or_msg_ids = [indices_or_msg_ids]
1372 indices_or_msg_ids = [indices_or_msg_ids]
1381
1373
1382 theids = []
1374 theids = []
1383 for id in indices_or_msg_ids:
1375 for id in indices_or_msg_ids:
1384 if isinstance(id, int):
1376 if isinstance(id, int):
1385 id = self.history[id]
1377 id = self.history[id]
1386 if not isinstance(id, basestring):
1378 if not isinstance(id, basestring):
1387 raise TypeError("indices must be str or int, not %r"%id)
1379 raise TypeError("indices must be str or int, not %r"%id)
1388 theids.append(id)
1380 theids.append(id)
1389
1381
1390 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1382 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1391 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1383 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1392
1384
1393 if remote_ids:
1385 if remote_ids:
1394 ar = AsyncHubResult(self, msg_ids=theids)
1386 ar = AsyncHubResult(self, msg_ids=theids)
1395 else:
1387 else:
1396 ar = AsyncResult(self, msg_ids=theids)
1388 ar = AsyncResult(self, msg_ids=theids)
1397
1389
1398 if block:
1390 if block:
1399 ar.wait()
1391 ar.wait()
1400
1392
1401 return ar
1393 return ar
1402
1394
1403 @spin_first
1395 @spin_first
1404 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1396 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1405 """Resubmit one or more tasks.
1397 """Resubmit one or more tasks.
1406
1398
1407 in-flight tasks may not be resubmitted.
1399 in-flight tasks may not be resubmitted.
1408
1400
1409 Parameters
1401 Parameters
1410 ----------
1402 ----------
1411
1403
1412 indices_or_msg_ids : integer history index, str msg_id, or list of either
1404 indices_or_msg_ids : integer history index, str msg_id, or list of either
1413 The indices or msg_ids of indices to be retrieved
1405 The indices or msg_ids of indices to be retrieved
1414
1406
1415 block : bool
1407 block : bool
1416 Whether to wait for the result to be done
1408 Whether to wait for the result to be done
1417
1409
1418 Returns
1410 Returns
1419 -------
1411 -------
1420
1412
1421 AsyncHubResult
1413 AsyncHubResult
1422 A subclass of AsyncResult that retrieves results from the Hub
1414 A subclass of AsyncResult that retrieves results from the Hub
1423
1415
1424 """
1416 """
1425 block = self.block if block is None else block
1417 block = self.block if block is None else block
1426 if indices_or_msg_ids is None:
1418 if indices_or_msg_ids is None:
1427 indices_or_msg_ids = -1
1419 indices_or_msg_ids = -1
1428
1420
1429 if not isinstance(indices_or_msg_ids, (list,tuple)):
1421 if not isinstance(indices_or_msg_ids, (list,tuple)):
1430 indices_or_msg_ids = [indices_or_msg_ids]
1422 indices_or_msg_ids = [indices_or_msg_ids]
1431
1423
1432 theids = []
1424 theids = []
1433 for id in indices_or_msg_ids:
1425 for id in indices_or_msg_ids:
1434 if isinstance(id, int):
1426 if isinstance(id, int):
1435 id = self.history[id]
1427 id = self.history[id]
1436 if not isinstance(id, basestring):
1428 if not isinstance(id, basestring):
1437 raise TypeError("indices must be str or int, not %r"%id)
1429 raise TypeError("indices must be str or int, not %r"%id)
1438 theids.append(id)
1430 theids.append(id)
1439
1431
1440 content = dict(msg_ids = theids)
1432 content = dict(msg_ids = theids)
1441
1433
1442 self.session.send(self._query_socket, 'resubmit_request', content)
1434 self.session.send(self._query_socket, 'resubmit_request', content)
1443
1435
1444 zmq.select([self._query_socket], [], [])
1436 zmq.select([self._query_socket], [], [])
1445 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1437 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1446 if self.debug:
1438 if self.debug:
1447 pprint(msg)
1439 pprint(msg)
1448 content = msg['content']
1440 content = msg['content']
1449 if content['status'] != 'ok':
1441 if content['status'] != 'ok':
1450 raise self._unwrap_exception(content)
1442 raise self._unwrap_exception(content)
1451 mapping = content['resubmitted']
1443 mapping = content['resubmitted']
1452 new_ids = [ mapping[msg_id] for msg_id in theids ]
1444 new_ids = [ mapping[msg_id] for msg_id in theids ]
1453
1445
1454 ar = AsyncHubResult(self, msg_ids=new_ids)
1446 ar = AsyncHubResult(self, msg_ids=new_ids)
1455
1447
1456 if block:
1448 if block:
1457 ar.wait()
1449 ar.wait()
1458
1450
1459 return ar
1451 return ar
1460
1452
1461 @spin_first
1453 @spin_first
1462 def result_status(self, msg_ids, status_only=True):
1454 def result_status(self, msg_ids, status_only=True):
1463 """Check on the status of the result(s) of the apply request with `msg_ids`.
1455 """Check on the status of the result(s) of the apply request with `msg_ids`.
1464
1456
1465 If status_only is False, then the actual results will be retrieved, else
1457 If status_only is False, then the actual results will be retrieved, else
1466 only the status of the results will be checked.
1458 only the status of the results will be checked.
1467
1459
1468 Parameters
1460 Parameters
1469 ----------
1461 ----------
1470
1462
1471 msg_ids : list of msg_ids
1463 msg_ids : list of msg_ids
1472 if int:
1464 if int:
1473 Passed as index to self.history for convenience.
1465 Passed as index to self.history for convenience.
1474 status_only : bool (default: True)
1466 status_only : bool (default: True)
1475 if False:
1467 if False:
1476 Retrieve the actual results of completed tasks.
1468 Retrieve the actual results of completed tasks.
1477
1469
1478 Returns
1470 Returns
1479 -------
1471 -------
1480
1472
1481 results : dict
1473 results : dict
1482 There will always be the keys 'pending' and 'completed', which will
1474 There will always be the keys 'pending' and 'completed', which will
1483 be lists of msg_ids that are incomplete or complete. If `status_only`
1475 be lists of msg_ids that are incomplete or complete. If `status_only`
1484 is False, then completed results will be keyed by their `msg_id`.
1476 is False, then completed results will be keyed by their `msg_id`.
1485 """
1477 """
1486 if not isinstance(msg_ids, (list,tuple)):
1478 if not isinstance(msg_ids, (list,tuple)):
1487 msg_ids = [msg_ids]
1479 msg_ids = [msg_ids]
1488
1480
1489 theids = []
1481 theids = []
1490 for msg_id in msg_ids:
1482 for msg_id in msg_ids:
1491 if isinstance(msg_id, int):
1483 if isinstance(msg_id, int):
1492 msg_id = self.history[msg_id]
1484 msg_id = self.history[msg_id]
1493 if not isinstance(msg_id, basestring):
1485 if not isinstance(msg_id, basestring):
1494 raise TypeError("msg_ids must be str, not %r"%msg_id)
1486 raise TypeError("msg_ids must be str, not %r"%msg_id)
1495 theids.append(msg_id)
1487 theids.append(msg_id)
1496
1488
1497 completed = []
1489 completed = []
1498 local_results = {}
1490 local_results = {}
1499
1491
1500 # comment this block out to temporarily disable local shortcut:
1492 # comment this block out to temporarily disable local shortcut:
1501 for msg_id in theids:
1493 for msg_id in theids:
1502 if msg_id in self.results:
1494 if msg_id in self.results:
1503 completed.append(msg_id)
1495 completed.append(msg_id)
1504 local_results[msg_id] = self.results[msg_id]
1496 local_results[msg_id] = self.results[msg_id]
1505 theids.remove(msg_id)
1497 theids.remove(msg_id)
1506
1498
1507 if theids: # some not locally cached
1499 if theids: # some not locally cached
1508 content = dict(msg_ids=theids, status_only=status_only)
1500 content = dict(msg_ids=theids, status_only=status_only)
1509 msg = self.session.send(self._query_socket, "result_request", content=content)
1501 msg = self.session.send(self._query_socket, "result_request", content=content)
1510 zmq.select([self._query_socket], [], [])
1502 zmq.select([self._query_socket], [], [])
1511 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1503 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1512 if self.debug:
1504 if self.debug:
1513 pprint(msg)
1505 pprint(msg)
1514 content = msg['content']
1506 content = msg['content']
1515 if content['status'] != 'ok':
1507 if content['status'] != 'ok':
1516 raise self._unwrap_exception(content)
1508 raise self._unwrap_exception(content)
1517 buffers = msg['buffers']
1509 buffers = msg['buffers']
1518 else:
1510 else:
1519 content = dict(completed=[],pending=[])
1511 content = dict(completed=[],pending=[])
1520
1512
1521 content['completed'].extend(completed)
1513 content['completed'].extend(completed)
1522
1514
1523 if status_only:
1515 if status_only:
1524 return content
1516 return content
1525
1517
1526 failures = []
1518 failures = []
1527 # load cached results into result:
1519 # load cached results into result:
1528 content.update(local_results)
1520 content.update(local_results)
1529
1521
1530 # update cache with results:
1522 # update cache with results:
1531 for msg_id in sorted(theids):
1523 for msg_id in sorted(theids):
1532 if msg_id in content['completed']:
1524 if msg_id in content['completed']:
1533 rec = content[msg_id]
1525 rec = content[msg_id]
1534 parent = rec['header']
1526 parent = rec['header']
1535 header = rec['result_header']
1527 header = rec['result_header']
1536 rcontent = rec['result_content']
1528 rcontent = rec['result_content']
1537 iodict = rec['io']
1529 iodict = rec['io']
1538 if isinstance(rcontent, str):
1530 if isinstance(rcontent, str):
1539 rcontent = self.session.unpack(rcontent)
1531 rcontent = self.session.unpack(rcontent)
1540
1532
1541 md = self.metadata[msg_id]
1533 md = self.metadata[msg_id]
1542 md.update(self._extract_metadata(header, parent, rcontent))
1534 md.update(self._extract_metadata(header, parent, rcontent))
1543 if rec.get('received'):
1535 if rec.get('received'):
1544 md['received'] = rec['received']
1536 md['received'] = rec['received']
1545 md.update(iodict)
1537 md.update(iodict)
1546
1538
1547 if rcontent['status'] == 'ok':
1539 if rcontent['status'] == 'ok':
1548 if header['msg_type'] == 'apply_reply':
1540 if header['msg_type'] == 'apply_reply':
1549 res,buffers = util.unserialize_object(buffers)
1541 res,buffers = util.unserialize_object(buffers)
1550 elif header['msg_type'] == 'execute_reply':
1542 elif header['msg_type'] == 'execute_reply':
1551 res = ExecuteReply(msg_id, rcontent, md)
1543 res = ExecuteReply(msg_id, rcontent, md)
1552 else:
1544 else:
1553 raise KeyError("unhandled msg type: %r" % header[msg_type])
1545 raise KeyError("unhandled msg type: %r" % header[msg_type])
1554 else:
1546 else:
1555 res = self._unwrap_exception(rcontent)
1547 res = self._unwrap_exception(rcontent)
1556 failures.append(res)
1548 failures.append(res)
1557
1549
1558 self.results[msg_id] = res
1550 self.results[msg_id] = res
1559 content[msg_id] = res
1551 content[msg_id] = res
1560
1552
1561 if len(theids) == 1 and failures:
1553 if len(theids) == 1 and failures:
1562 raise failures[0]
1554 raise failures[0]
1563
1555
1564 error.collect_exceptions(failures, "result_status")
1556 error.collect_exceptions(failures, "result_status")
1565 return content
1557 return content
1566
1558
1567 @spin_first
1559 @spin_first
1568 def queue_status(self, targets='all', verbose=False):
1560 def queue_status(self, targets='all', verbose=False):
1569 """Fetch the status of engine queues.
1561 """Fetch the status of engine queues.
1570
1562
1571 Parameters
1563 Parameters
1572 ----------
1564 ----------
1573
1565
1574 targets : int/str/list of ints/strs
1566 targets : int/str/list of ints/strs
1575 the engines whose states are to be queried.
1567 the engines whose states are to be queried.
1576 default : all
1568 default : all
1577 verbose : bool
1569 verbose : bool
1578 Whether to return lengths only, or lists of ids for each element
1570 Whether to return lengths only, or lists of ids for each element
1579 """
1571 """
1580 if targets == 'all':
1572 if targets == 'all':
1581 # allow 'all' to be evaluated on the engine
1573 # allow 'all' to be evaluated on the engine
1582 engine_ids = None
1574 engine_ids = None
1583 else:
1575 else:
1584 engine_ids = self._build_targets(targets)[1]
1576 engine_ids = self._build_targets(targets)[1]
1585 content = dict(targets=engine_ids, verbose=verbose)
1577 content = dict(targets=engine_ids, verbose=verbose)
1586 self.session.send(self._query_socket, "queue_request", content=content)
1578 self.session.send(self._query_socket, "queue_request", content=content)
1587 idents,msg = self.session.recv(self._query_socket, 0)
1579 idents,msg = self.session.recv(self._query_socket, 0)
1588 if self.debug:
1580 if self.debug:
1589 pprint(msg)
1581 pprint(msg)
1590 content = msg['content']
1582 content = msg['content']
1591 status = content.pop('status')
1583 status = content.pop('status')
1592 if status != 'ok':
1584 if status != 'ok':
1593 raise self._unwrap_exception(content)
1585 raise self._unwrap_exception(content)
1594 content = rekey(content)
1586 content = rekey(content)
1595 if isinstance(targets, int):
1587 if isinstance(targets, int):
1596 return content[targets]
1588 return content[targets]
1597 else:
1589 else:
1598 return content
1590 return content
1599
1591
1600 @spin_first
1592 @spin_first
1601 def purge_results(self, jobs=[], targets=[]):
1593 def purge_results(self, jobs=[], targets=[]):
1602 """Tell the Hub to forget results.
1594 """Tell the Hub to forget results.
1603
1595
1604 Individual results can be purged by msg_id, or the entire
1596 Individual results can be purged by msg_id, or the entire
1605 history of specific targets can be purged.
1597 history of specific targets can be purged.
1606
1598
1607 Use `purge_results('all')` to scrub everything from the Hub's db.
1599 Use `purge_results('all')` to scrub everything from the Hub's db.
1608
1600
1609 Parameters
1601 Parameters
1610 ----------
1602 ----------
1611
1603
1612 jobs : str or list of str or AsyncResult objects
1604 jobs : str or list of str or AsyncResult objects
1613 the msg_ids whose results should be forgotten.
1605 the msg_ids whose results should be forgotten.
1614 targets : int/str/list of ints/strs
1606 targets : int/str/list of ints/strs
1615 The targets, by int_id, whose entire history is to be purged.
1607 The targets, by int_id, whose entire history is to be purged.
1616
1608
1617 default : None
1609 default : None
1618 """
1610 """
1619 if not targets and not jobs:
1611 if not targets and not jobs:
1620 raise ValueError("Must specify at least one of `targets` and `jobs`")
1612 raise ValueError("Must specify at least one of `targets` and `jobs`")
1621 if targets:
1613 if targets:
1622 targets = self._build_targets(targets)[1]
1614 targets = self._build_targets(targets)[1]
1623
1615
1624 # construct msg_ids from jobs
1616 # construct msg_ids from jobs
1625 if jobs == 'all':
1617 if jobs == 'all':
1626 msg_ids = jobs
1618 msg_ids = jobs
1627 else:
1619 else:
1628 msg_ids = []
1620 msg_ids = []
1629 if isinstance(jobs, (basestring,AsyncResult)):
1621 if isinstance(jobs, (basestring,AsyncResult)):
1630 jobs = [jobs]
1622 jobs = [jobs]
1631 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1623 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1632 if bad_ids:
1624 if bad_ids:
1633 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1625 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1634 for j in jobs:
1626 for j in jobs:
1635 if isinstance(j, AsyncResult):
1627 if isinstance(j, AsyncResult):
1636 msg_ids.extend(j.msg_ids)
1628 msg_ids.extend(j.msg_ids)
1637 else:
1629 else:
1638 msg_ids.append(j)
1630 msg_ids.append(j)
1639
1631
1640 content = dict(engine_ids=targets, msg_ids=msg_ids)
1632 content = dict(engine_ids=targets, msg_ids=msg_ids)
1641 self.session.send(self._query_socket, "purge_request", content=content)
1633 self.session.send(self._query_socket, "purge_request", content=content)
1642 idents, msg = self.session.recv(self._query_socket, 0)
1634 idents, msg = self.session.recv(self._query_socket, 0)
1643 if self.debug:
1635 if self.debug:
1644 pprint(msg)
1636 pprint(msg)
1645 content = msg['content']
1637 content = msg['content']
1646 if content['status'] != 'ok':
1638 if content['status'] != 'ok':
1647 raise self._unwrap_exception(content)
1639 raise self._unwrap_exception(content)
1648
1640
1649 @spin_first
1641 @spin_first
1650 def hub_history(self):
1642 def hub_history(self):
1651 """Get the Hub's history
1643 """Get the Hub's history
1652
1644
1653 Just like the Client, the Hub has a history, which is a list of msg_ids.
1645 Just like the Client, the Hub has a history, which is a list of msg_ids.
1654 This will contain the history of all clients, and, depending on configuration,
1646 This will contain the history of all clients, and, depending on configuration,
1655 may contain history across multiple cluster sessions.
1647 may contain history across multiple cluster sessions.
1656
1648
1657 Any msg_id returned here is a valid argument to `get_result`.
1649 Any msg_id returned here is a valid argument to `get_result`.
1658
1650
1659 Returns
1651 Returns
1660 -------
1652 -------
1661
1653
1662 msg_ids : list of strs
1654 msg_ids : list of strs
1663 list of all msg_ids, ordered by task submission time.
1655 list of all msg_ids, ordered by task submission time.
1664 """
1656 """
1665
1657
1666 self.session.send(self._query_socket, "history_request", content={})
1658 self.session.send(self._query_socket, "history_request", content={})
1667 idents, msg = self.session.recv(self._query_socket, 0)
1659 idents, msg = self.session.recv(self._query_socket, 0)
1668
1660
1669 if self.debug:
1661 if self.debug:
1670 pprint(msg)
1662 pprint(msg)
1671 content = msg['content']
1663 content = msg['content']
1672 if content['status'] != 'ok':
1664 if content['status'] != 'ok':
1673 raise self._unwrap_exception(content)
1665 raise self._unwrap_exception(content)
1674 else:
1666 else:
1675 return content['history']
1667 return content['history']
1676
1668
1677 @spin_first
1669 @spin_first
1678 def db_query(self, query, keys=None):
1670 def db_query(self, query, keys=None):
1679 """Query the Hub's TaskRecord database
1671 """Query the Hub's TaskRecord database
1680
1672
1681 This will return a list of task record dicts that match `query`
1673 This will return a list of task record dicts that match `query`
1682
1674
1683 Parameters
1675 Parameters
1684 ----------
1676 ----------
1685
1677
1686 query : mongodb query dict
1678 query : mongodb query dict
1687 The search dict. See mongodb query docs for details.
1679 The search dict. See mongodb query docs for details.
1688 keys : list of strs [optional]
1680 keys : list of strs [optional]
1689 The subset of keys to be returned. The default is to fetch everything but buffers.
1681 The subset of keys to be returned. The default is to fetch everything but buffers.
1690 'msg_id' will *always* be included.
1682 'msg_id' will *always* be included.
1691 """
1683 """
1692 if isinstance(keys, basestring):
1684 if isinstance(keys, basestring):
1693 keys = [keys]
1685 keys = [keys]
1694 content = dict(query=query, keys=keys)
1686 content = dict(query=query, keys=keys)
1695 self.session.send(self._query_socket, "db_request", content=content)
1687 self.session.send(self._query_socket, "db_request", content=content)
1696 idents, msg = self.session.recv(self._query_socket, 0)
1688 idents, msg = self.session.recv(self._query_socket, 0)
1697 if self.debug:
1689 if self.debug:
1698 pprint(msg)
1690 pprint(msg)
1699 content = msg['content']
1691 content = msg['content']
1700 if content['status'] != 'ok':
1692 if content['status'] != 'ok':
1701 raise self._unwrap_exception(content)
1693 raise self._unwrap_exception(content)
1702
1694
1703 records = content['records']
1695 records = content['records']
1704
1696
1705 buffer_lens = content['buffer_lens']
1697 buffer_lens = content['buffer_lens']
1706 result_buffer_lens = content['result_buffer_lens']
1698 result_buffer_lens = content['result_buffer_lens']
1707 buffers = msg['buffers']
1699 buffers = msg['buffers']
1708 has_bufs = buffer_lens is not None
1700 has_bufs = buffer_lens is not None
1709 has_rbufs = result_buffer_lens is not None
1701 has_rbufs = result_buffer_lens is not None
1710 for i,rec in enumerate(records):
1702 for i,rec in enumerate(records):
1711 # relink buffers
1703 # relink buffers
1712 if has_bufs:
1704 if has_bufs:
1713 blen = buffer_lens[i]
1705 blen = buffer_lens[i]
1714 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1706 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1715 if has_rbufs:
1707 if has_rbufs:
1716 blen = result_buffer_lens[i]
1708 blen = result_buffer_lens[i]
1717 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1709 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1718
1710
1719 return records
1711 return records
1720
1712
1721 __all__ = [ 'Client' ]
1713 __all__ = [ 'Client' ]
@@ -1,1337 +1,1338 b''
1 """The IPython Controller Hub with 0MQ
1 """The IPython Controller Hub with 0MQ
2 This is the master object that handles connections from engines and clients,
2 This is the master object that handles connections from engines and clients,
3 and monitors traffic through the various queues.
3 and monitors traffic through the various queues.
4
4
5 Authors:
5 Authors:
6
6
7 * Min RK
7 * Min RK
8 """
8 """
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Copyright (C) 2010-2011 The IPython Development Team
10 # Copyright (C) 2010-2011 The IPython Development Team
11 #
11 #
12 # Distributed under the terms of the BSD License. The full license is in
12 # Distributed under the terms of the BSD License. The full license is in
13 # the file COPYING, distributed as part of this software.
13 # the file COPYING, distributed as part of this software.
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Imports
17 # Imports
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 from __future__ import print_function
19 from __future__ import print_function
20
20
21 import sys
21 import sys
22 import time
22 import time
23 from datetime import datetime
23 from datetime import datetime
24
24
25 import zmq
25 import zmq
26 from zmq.eventloop import ioloop
26 from zmq.eventloop import ioloop
27 from zmq.eventloop.zmqstream import ZMQStream
27 from zmq.eventloop.zmqstream import ZMQStream
28
28
29 # internal:
29 # internal:
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31 from IPython.utils.py3compat import cast_bytes
31 from IPython.utils.py3compat import cast_bytes
32 from IPython.utils.traitlets import (
32 from IPython.utils.traitlets import (
33 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
33 HasTraits, Instance, Integer, Unicode, Dict, Set, Tuple, CBytes, DottedObjectName
34 )
34 )
35
35
36 from IPython.parallel import error, util
36 from IPython.parallel import error, util
37 from IPython.parallel.factory import RegistrationFactory
37 from IPython.parallel.factory import RegistrationFactory
38
38
39 from IPython.zmq.session import SessionFactory
39 from IPython.zmq.session import SessionFactory
40
40
41 from .heartmonitor import HeartMonitor
41 from .heartmonitor import HeartMonitor
42
42
43 #-----------------------------------------------------------------------------
43 #-----------------------------------------------------------------------------
44 # Code
44 # Code
45 #-----------------------------------------------------------------------------
45 #-----------------------------------------------------------------------------
46
46
47 def _passer(*args, **kwargs):
47 def _passer(*args, **kwargs):
48 return
48 return
49
49
50 def _printer(*args, **kwargs):
50 def _printer(*args, **kwargs):
51 print (args)
51 print (args)
52 print (kwargs)
52 print (kwargs)
53
53
54 def empty_record():
54 def empty_record():
55 """Return an empty dict with all record keys."""
55 """Return an empty dict with all record keys."""
56 return {
56 return {
57 'msg_id' : None,
57 'msg_id' : None,
58 'header' : None,
58 'header' : None,
59 'content': None,
59 'content': None,
60 'buffers': None,
60 'buffers': None,
61 'submitted': None,
61 'submitted': None,
62 'client_uuid' : None,
62 'client_uuid' : None,
63 'engine_uuid' : None,
63 'engine_uuid' : None,
64 'started': None,
64 'started': None,
65 'completed': None,
65 'completed': None,
66 'resubmitted': None,
66 'resubmitted': None,
67 'received': None,
67 'received': None,
68 'result_header' : None,
68 'result_header' : None,
69 'result_content' : None,
69 'result_content' : None,
70 'result_buffers' : None,
70 'result_buffers' : None,
71 'queue' : None,
71 'queue' : None,
72 'pyin' : None,
72 'pyin' : None,
73 'pyout': None,
73 'pyout': None,
74 'pyerr': None,
74 'pyerr': None,
75 'stdout': '',
75 'stdout': '',
76 'stderr': '',
76 'stderr': '',
77 }
77 }
78
78
79 def init_record(msg):
79 def init_record(msg):
80 """Initialize a TaskRecord based on a request."""
80 """Initialize a TaskRecord based on a request."""
81 header = msg['header']
81 header = msg['header']
82 return {
82 return {
83 'msg_id' : header['msg_id'],
83 'msg_id' : header['msg_id'],
84 'header' : header,
84 'header' : header,
85 'content': msg['content'],
85 'content': msg['content'],
86 'buffers': msg['buffers'],
86 'buffers': msg['buffers'],
87 'submitted': header['date'],
87 'submitted': header['date'],
88 'client_uuid' : None,
88 'client_uuid' : None,
89 'engine_uuid' : None,
89 'engine_uuid' : None,
90 'started': None,
90 'started': None,
91 'completed': None,
91 'completed': None,
92 'resubmitted': None,
92 'resubmitted': None,
93 'received': None,
93 'received': None,
94 'result_header' : None,
94 'result_header' : None,
95 'result_content' : None,
95 'result_content' : None,
96 'result_buffers' : None,
96 'result_buffers' : None,
97 'queue' : None,
97 'queue' : None,
98 'pyin' : None,
98 'pyin' : None,
99 'pyout': None,
99 'pyout': None,
100 'pyerr': None,
100 'pyerr': None,
101 'stdout': '',
101 'stdout': '',
102 'stderr': '',
102 'stderr': '',
103 }
103 }
104
104
105
105
106 class EngineConnector(HasTraits):
106 class EngineConnector(HasTraits):
107 """A simple object for accessing the various zmq connections of an object.
107 """A simple object for accessing the various zmq connections of an object.
108 Attributes are:
108 Attributes are:
109 id (int): engine ID
109 id (int): engine ID
110 uuid (str): uuid (unused?)
110 uuid (str): uuid (unused?)
111 queue (str): identity of queue's DEALER socket
111 queue (str): identity of queue's DEALER socket
112 registration (str): identity of registration DEALER socket
112 registration (str): identity of registration DEALER socket
113 heartbeat (str): identity of heartbeat DEALER socket
113 heartbeat (str): identity of heartbeat DEALER socket
114 """
114 """
115 id=Integer(0)
115 id=Integer(0)
116 queue=CBytes()
116 queue=CBytes()
117 control=CBytes()
117 control=CBytes()
118 registration=CBytes()
118 registration=CBytes()
119 heartbeat=CBytes()
119 heartbeat=CBytes()
120 pending=Set()
120 pending=Set()
121
121
122 _db_shortcuts = {
122 _db_shortcuts = {
123 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
123 'sqlitedb' : 'IPython.parallel.controller.sqlitedb.SQLiteDB',
124 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
124 'mongodb' : 'IPython.parallel.controller.mongodb.MongoDB',
125 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
125 'dictdb' : 'IPython.parallel.controller.dictdb.DictDB',
126 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
126 'nodb' : 'IPython.parallel.controller.dictdb.NoDB',
127 }
127 }
128
128
129 class HubFactory(RegistrationFactory):
129 class HubFactory(RegistrationFactory):
130 """The Configurable for setting up a Hub."""
130 """The Configurable for setting up a Hub."""
131
131
132 # port-pairs for monitoredqueues:
132 # port-pairs for monitoredqueues:
133 hb = Tuple(Integer,Integer,config=True,
133 hb = Tuple(Integer,Integer,config=True,
134 help="""DEALER/SUB Port pair for Engine heartbeats""")
134 help="""DEALER/SUB Port pair for Engine heartbeats""")
135 def _hb_default(self):
135 def _hb_default(self):
136 return tuple(util.select_random_ports(2))
136 return tuple(util.select_random_ports(2))
137
137
138 mux = Tuple(Integer,Integer,config=True,
138 mux = Tuple(Integer,Integer,config=True,
139 help="""Engine/Client Port pair for MUX queue""")
139 help="""Engine/Client Port pair for MUX queue""")
140
140
141 def _mux_default(self):
141 def _mux_default(self):
142 return tuple(util.select_random_ports(2))
142 return tuple(util.select_random_ports(2))
143
143
144 task = Tuple(Integer,Integer,config=True,
144 task = Tuple(Integer,Integer,config=True,
145 help="""Engine/Client Port pair for Task queue""")
145 help="""Engine/Client Port pair for Task queue""")
146 def _task_default(self):
146 def _task_default(self):
147 return tuple(util.select_random_ports(2))
147 return tuple(util.select_random_ports(2))
148
148
149 control = Tuple(Integer,Integer,config=True,
149 control = Tuple(Integer,Integer,config=True,
150 help="""Engine/Client Port pair for Control queue""")
150 help="""Engine/Client Port pair for Control queue""")
151
151
152 def _control_default(self):
152 def _control_default(self):
153 return tuple(util.select_random_ports(2))
153 return tuple(util.select_random_ports(2))
154
154
155 iopub = Tuple(Integer,Integer,config=True,
155 iopub = Tuple(Integer,Integer,config=True,
156 help="""Engine/Client Port pair for IOPub relay""")
156 help="""Engine/Client Port pair for IOPub relay""")
157
157
158 def _iopub_default(self):
158 def _iopub_default(self):
159 return tuple(util.select_random_ports(2))
159 return tuple(util.select_random_ports(2))
160
160
161 # single ports:
161 # single ports:
162 mon_port = Integer(config=True,
162 mon_port = Integer(config=True,
163 help="""Monitor (SUB) port for queue traffic""")
163 help="""Monitor (SUB) port for queue traffic""")
164
164
165 def _mon_port_default(self):
165 def _mon_port_default(self):
166 return util.select_random_ports(1)[0]
166 return util.select_random_ports(1)[0]
167
167
168 notifier_port = Integer(config=True,
168 notifier_port = Integer(config=True,
169 help="""PUB port for sending engine status notifications""")
169 help="""PUB port for sending engine status notifications""")
170
170
171 def _notifier_port_default(self):
171 def _notifier_port_default(self):
172 return util.select_random_ports(1)[0]
172 return util.select_random_ports(1)[0]
173
173
174 engine_ip = Unicode('127.0.0.1', config=True,
174 engine_ip = Unicode('127.0.0.1', config=True,
175 help="IP on which to listen for engine connections. [default: loopback]")
175 help="IP on which to listen for engine connections. [default: loopback]")
176 engine_transport = Unicode('tcp', config=True,
176 engine_transport = Unicode('tcp', config=True,
177 help="0MQ transport for engine connections. [default: tcp]")
177 help="0MQ transport for engine connections. [default: tcp]")
178
178
179 client_ip = Unicode('127.0.0.1', config=True,
179 client_ip = Unicode('127.0.0.1', config=True,
180 help="IP on which to listen for client connections. [default: loopback]")
180 help="IP on which to listen for client connections. [default: loopback]")
181 client_transport = Unicode('tcp', config=True,
181 client_transport = Unicode('tcp', config=True,
182 help="0MQ transport for client connections. [default : tcp]")
182 help="0MQ transport for client connections. [default : tcp]")
183
183
184 monitor_ip = Unicode('127.0.0.1', config=True,
184 monitor_ip = Unicode('127.0.0.1', config=True,
185 help="IP on which to listen for monitor messages. [default: loopback]")
185 help="IP on which to listen for monitor messages. [default: loopback]")
186 monitor_transport = Unicode('tcp', config=True,
186 monitor_transport = Unicode('tcp', config=True,
187 help="0MQ transport for monitor messages. [default : tcp]")
187 help="0MQ transport for monitor messages. [default : tcp]")
188
188
189 monitor_url = Unicode('')
189 monitor_url = Unicode('')
190
190
191 db_class = DottedObjectName('NoDB',
191 db_class = DottedObjectName('NoDB',
192 config=True, help="""The class to use for the DB backend
192 config=True, help="""The class to use for the DB backend
193
193
194 Options include:
194 Options include:
195
195
196 SQLiteDB: SQLite
196 SQLiteDB: SQLite
197 MongoDB : use MongoDB
197 MongoDB : use MongoDB
198 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
198 DictDB : in-memory storage (fastest, but be mindful of memory growth of the Hub)
199 NoDB : disable database altogether (default)
199 NoDB : disable database altogether (default)
200
200
201 """)
201 """)
202
202
203 # not configurable
203 # not configurable
204 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
204 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
205 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
205 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
206
206
207 def _ip_changed(self, name, old, new):
207 def _ip_changed(self, name, old, new):
208 self.engine_ip = new
208 self.engine_ip = new
209 self.client_ip = new
209 self.client_ip = new
210 self.monitor_ip = new
210 self.monitor_ip = new
211 self._update_monitor_url()
211 self._update_monitor_url()
212
212
213 def _update_monitor_url(self):
213 def _update_monitor_url(self):
214 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
214 self.monitor_url = "%s://%s:%i" % (self.monitor_transport, self.monitor_ip, self.mon_port)
215
215
216 def _transport_changed(self, name, old, new):
216 def _transport_changed(self, name, old, new):
217 self.engine_transport = new
217 self.engine_transport = new
218 self.client_transport = new
218 self.client_transport = new
219 self.monitor_transport = new
219 self.monitor_transport = new
220 self._update_monitor_url()
220 self._update_monitor_url()
221
221
222 def __init__(self, **kwargs):
222 def __init__(self, **kwargs):
223 super(HubFactory, self).__init__(**kwargs)
223 super(HubFactory, self).__init__(**kwargs)
224 self._update_monitor_url()
224 self._update_monitor_url()
225
225
226
226
227 def construct(self):
227 def construct(self):
228 self.init_hub()
228 self.init_hub()
229
229
230 def start(self):
230 def start(self):
231 self.heartmonitor.start()
231 self.heartmonitor.start()
232 self.log.info("Heartmonitor started")
232 self.log.info("Heartmonitor started")
233
233
234 def init_hub(self):
234 def init_hub(self):
235 """construct"""
235 """construct"""
236 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
236 client_iface = "%s://%s:" % (self.client_transport, self.client_ip) + "%i"
237 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
237 engine_iface = "%s://%s:" % (self.engine_transport, self.engine_ip) + "%i"
238
238
239 ctx = self.context
239 ctx = self.context
240 loop = self.loop
240 loop = self.loop
241
241
242 # Registrar socket
242 # Registrar socket
243 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
243 q = ZMQStream(ctx.socket(zmq.ROUTER), loop)
244 q.bind(client_iface % self.regport)
244 q.bind(client_iface % self.regport)
245 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
245 self.log.info("Hub listening on %s for registration.", client_iface % self.regport)
246 if self.client_ip != self.engine_ip:
246 if self.client_ip != self.engine_ip:
247 q.bind(engine_iface % self.regport)
247 q.bind(engine_iface % self.regport)
248 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
248 self.log.info("Hub listening on %s for registration.", engine_iface % self.regport)
249
249
250 ### Engine connections ###
250 ### Engine connections ###
251
251
252 # heartbeat
252 # heartbeat
253 hpub = ctx.socket(zmq.PUB)
253 hpub = ctx.socket(zmq.PUB)
254 hpub.bind(engine_iface % self.hb[0])
254 hpub.bind(engine_iface % self.hb[0])
255 hrep = ctx.socket(zmq.ROUTER)
255 hrep = ctx.socket(zmq.ROUTER)
256 hrep.bind(engine_iface % self.hb[1])
256 hrep.bind(engine_iface % self.hb[1])
257 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
257 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
258 pingstream=ZMQStream(hpub,loop),
258 pingstream=ZMQStream(hpub,loop),
259 pongstream=ZMQStream(hrep,loop)
259 pongstream=ZMQStream(hrep,loop)
260 )
260 )
261
261
262 ### Client connections ###
262 ### Client connections ###
263 # Notifier socket
263 # Notifier socket
264 n = ZMQStream(ctx.socket(zmq.PUB), loop)
264 n = ZMQStream(ctx.socket(zmq.PUB), loop)
265 n.bind(client_iface%self.notifier_port)
265 n.bind(client_iface%self.notifier_port)
266
266
267 ### build and launch the queues ###
267 ### build and launch the queues ###
268
268
269 # monitor socket
269 # monitor socket
270 sub = ctx.socket(zmq.SUB)
270 sub = ctx.socket(zmq.SUB)
271 sub.setsockopt(zmq.SUBSCRIBE, b"")
271 sub.setsockopt(zmq.SUBSCRIBE, b"")
272 sub.bind(self.monitor_url)
272 sub.bind(self.monitor_url)
273 sub.bind('inproc://monitor')
273 sub.bind('inproc://monitor')
274 sub = ZMQStream(sub, loop)
274 sub = ZMQStream(sub, loop)
275
275
276 # connect the db
276 # connect the db
277 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
277 db_class = _db_shortcuts.get(self.db_class.lower(), self.db_class)
278 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
278 self.log.info('Hub using DB backend: %r', (db_class.split('.')[-1]))
279 self.db = import_item(str(db_class))(session=self.session.session,
279 self.db = import_item(str(db_class))(session=self.session.session,
280 config=self.config, log=self.log)
280 config=self.config, log=self.log)
281 time.sleep(.25)
281 time.sleep(.25)
282 try:
282 try:
283 scheme = self.config.TaskScheduler.scheme_name
283 scheme = self.config.TaskScheduler.scheme_name
284 except AttributeError:
284 except AttributeError:
285 from .scheduler import TaskScheduler
285 from .scheduler import TaskScheduler
286 scheme = TaskScheduler.scheme_name.get_default_value()
286 scheme = TaskScheduler.scheme_name.get_default_value()
287 # build connection dicts
287 # build connection dicts
288 self.engine_info = {
288 self.engine_info = {
289 'control' : engine_iface%self.control[1],
289 'control' : engine_iface%self.control[1],
290 'mux': engine_iface%self.mux[1],
290 'mux': engine_iface%self.mux[1],
291 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
291 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
292 'task' : engine_iface%self.task[1],
292 'task' : engine_iface%self.task[1],
293 'iopub' : engine_iface%self.iopub[1],
293 'iopub' : engine_iface%self.iopub[1],
294 # 'monitor' : engine_iface%self.mon_port,
294 # 'monitor' : engine_iface%self.mon_port,
295 }
295 }
296
296
297 self.client_info = {
297 self.client_info = {
298 'control' : client_iface%self.control[0],
298 'control' : client_iface%self.control[0],
299 'mux': client_iface%self.mux[0],
299 'mux': client_iface%self.mux[0],
300 'task' : (scheme, client_iface%self.task[0]),
300 'task' : (scheme, client_iface%self.task[0]),
301 'iopub' : client_iface%self.iopub[0],
301 'iopub' : client_iface%self.iopub[0],
302 'notification': client_iface%self.notifier_port
302 'notification': client_iface%self.notifier_port
303 }
303 }
304 self.log.debug("Hub engine addrs: %s", self.engine_info)
304 self.log.debug("Hub engine addrs: %s", self.engine_info)
305 self.log.debug("Hub client addrs: %s", self.client_info)
305 self.log.debug("Hub client addrs: %s", self.client_info)
306
306
307 # resubmit stream
307 # resubmit stream
308 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
308 r = ZMQStream(ctx.socket(zmq.DEALER), loop)
309 url = util.disambiguate_url(self.client_info['task'][-1])
309 url = util.disambiguate_url(self.client_info['task'][-1])
310 r.setsockopt(zmq.IDENTITY, self.session.bsession)
310 r.setsockopt(zmq.IDENTITY, self.session.bsession)
311 r.connect(url)
311 r.connect(url)
312
312
313 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
313 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
314 query=q, notifier=n, resubmit=r, db=self.db,
314 query=q, notifier=n, resubmit=r, db=self.db,
315 engine_info=self.engine_info, client_info=self.client_info,
315 engine_info=self.engine_info, client_info=self.client_info,
316 log=self.log)
316 log=self.log)
317
317
318
318
319 class Hub(SessionFactory):
319 class Hub(SessionFactory):
320 """The IPython Controller Hub with 0MQ connections
320 """The IPython Controller Hub with 0MQ connections
321
321
322 Parameters
322 Parameters
323 ==========
323 ==========
324 loop: zmq IOLoop instance
324 loop: zmq IOLoop instance
325 session: Session object
325 session: Session object
326 <removed> context: zmq context for creating new connections (?)
326 <removed> context: zmq context for creating new connections (?)
327 queue: ZMQStream for monitoring the command queue (SUB)
327 queue: ZMQStream for monitoring the command queue (SUB)
328 query: ZMQStream for engine registration and client queries requests (ROUTER)
328 query: ZMQStream for engine registration and client queries requests (ROUTER)
329 heartbeat: HeartMonitor object checking the pulse of the engines
329 heartbeat: HeartMonitor object checking the pulse of the engines
330 notifier: ZMQStream for broadcasting engine registration changes (PUB)
330 notifier: ZMQStream for broadcasting engine registration changes (PUB)
331 db: connection to db for out of memory logging of commands
331 db: connection to db for out of memory logging of commands
332 NotImplemented
332 NotImplemented
333 engine_info: dict of zmq connection information for engines to connect
333 engine_info: dict of zmq connection information for engines to connect
334 to the queues.
334 to the queues.
335 client_info: dict of zmq connection information for engines to connect
335 client_info: dict of zmq connection information for engines to connect
336 to the queues.
336 to the queues.
337 """
337 """
338 # internal data structures:
338 # internal data structures:
339 ids=Set() # engine IDs
339 ids=Set() # engine IDs
340 keytable=Dict()
340 keytable=Dict()
341 by_ident=Dict()
341 by_ident=Dict()
342 engines=Dict()
342 engines=Dict()
343 clients=Dict()
343 clients=Dict()
344 hearts=Dict()
344 hearts=Dict()
345 pending=Set()
345 pending=Set()
346 queues=Dict() # pending msg_ids keyed by engine_id
346 queues=Dict() # pending msg_ids keyed by engine_id
347 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
347 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
348 completed=Dict() # completed msg_ids keyed by engine_id
348 completed=Dict() # completed msg_ids keyed by engine_id
349 all_completed=Set() # completed msg_ids keyed by engine_id
349 all_completed=Set() # completed msg_ids keyed by engine_id
350 dead_engines=Set() # completed msg_ids keyed by engine_id
350 dead_engines=Set() # completed msg_ids keyed by engine_id
351 unassigned=Set() # set of task msg_ds not yet assigned a destination
351 unassigned=Set() # set of task msg_ds not yet assigned a destination
352 incoming_registrations=Dict()
352 incoming_registrations=Dict()
353 registration_timeout=Integer()
353 registration_timeout=Integer()
354 _idcounter=Integer(0)
354 _idcounter=Integer(0)
355
355
356 # objects from constructor:
356 # objects from constructor:
357 query=Instance(ZMQStream)
357 query=Instance(ZMQStream)
358 monitor=Instance(ZMQStream)
358 monitor=Instance(ZMQStream)
359 notifier=Instance(ZMQStream)
359 notifier=Instance(ZMQStream)
360 resubmit=Instance(ZMQStream)
360 resubmit=Instance(ZMQStream)
361 heartmonitor=Instance(HeartMonitor)
361 heartmonitor=Instance(HeartMonitor)
362 db=Instance(object)
362 db=Instance(object)
363 client_info=Dict()
363 client_info=Dict()
364 engine_info=Dict()
364 engine_info=Dict()
365
365
366
366
367 def __init__(self, **kwargs):
367 def __init__(self, **kwargs):
368 """
368 """
369 # universal:
369 # universal:
370 loop: IOLoop for creating future connections
370 loop: IOLoop for creating future connections
371 session: streamsession for sending serialized data
371 session: streamsession for sending serialized data
372 # engine:
372 # engine:
373 queue: ZMQStream for monitoring queue messages
373 queue: ZMQStream for monitoring queue messages
374 query: ZMQStream for engine+client registration and client requests
374 query: ZMQStream for engine+client registration and client requests
375 heartbeat: HeartMonitor object for tracking engines
375 heartbeat: HeartMonitor object for tracking engines
376 # extra:
376 # extra:
377 db: ZMQStream for db connection (NotImplemented)
377 db: ZMQStream for db connection (NotImplemented)
378 engine_info: zmq address/protocol dict for engine connections
378 engine_info: zmq address/protocol dict for engine connections
379 client_info: zmq address/protocol dict for client connections
379 client_info: zmq address/protocol dict for client connections
380 """
380 """
381
381
382 super(Hub, self).__init__(**kwargs)
382 super(Hub, self).__init__(**kwargs)
383 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
383 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
384
384
385 # validate connection dicts:
385 # validate connection dicts:
386 for k,v in self.client_info.iteritems():
386 for k,v in self.client_info.iteritems():
387 if k == 'task':
387 if k == 'task':
388 util.validate_url_container(v[1])
388 util.validate_url_container(v[1])
389 else:
389 else:
390 util.validate_url_container(v)
390 util.validate_url_container(v)
391 # util.validate_url_container(self.client_info)
391 # util.validate_url_container(self.client_info)
392 util.validate_url_container(self.engine_info)
392 util.validate_url_container(self.engine_info)
393
393
394 # register our callbacks
394 # register our callbacks
395 self.query.on_recv(self.dispatch_query)
395 self.query.on_recv(self.dispatch_query)
396 self.monitor.on_recv(self.dispatch_monitor_traffic)
396 self.monitor.on_recv(self.dispatch_monitor_traffic)
397
397
398 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
398 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
399 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
399 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
400
400
401 self.monitor_handlers = {b'in' : self.save_queue_request,
401 self.monitor_handlers = {b'in' : self.save_queue_request,
402 b'out': self.save_queue_result,
402 b'out': self.save_queue_result,
403 b'intask': self.save_task_request,
403 b'intask': self.save_task_request,
404 b'outtask': self.save_task_result,
404 b'outtask': self.save_task_result,
405 b'tracktask': self.save_task_destination,
405 b'tracktask': self.save_task_destination,
406 b'incontrol': _passer,
406 b'incontrol': _passer,
407 b'outcontrol': _passer,
407 b'outcontrol': _passer,
408 b'iopub': self.save_iopub_message,
408 b'iopub': self.save_iopub_message,
409 }
409 }
410
410
411 self.query_handlers = {'queue_request': self.queue_status,
411 self.query_handlers = {'queue_request': self.queue_status,
412 'result_request': self.get_results,
412 'result_request': self.get_results,
413 'history_request': self.get_history,
413 'history_request': self.get_history,
414 'db_request': self.db_query,
414 'db_request': self.db_query,
415 'purge_request': self.purge_results,
415 'purge_request': self.purge_results,
416 'load_request': self.check_load,
416 'load_request': self.check_load,
417 'resubmit_request': self.resubmit_task,
417 'resubmit_request': self.resubmit_task,
418 'shutdown_request': self.shutdown_request,
418 'shutdown_request': self.shutdown_request,
419 'registration_request' : self.register_engine,
419 'registration_request' : self.register_engine,
420 'unregistration_request' : self.unregister_engine,
420 'unregistration_request' : self.unregister_engine,
421 'connection_request': self.connection_request,
421 'connection_request': self.connection_request,
422 }
422 }
423
423
424 # ignore resubmit replies
424 # ignore resubmit replies
425 self.resubmit.on_recv(lambda msg: None, copy=False)
425 self.resubmit.on_recv(lambda msg: None, copy=False)
426
426
427 self.log.info("hub::created hub")
427 self.log.info("hub::created hub")
428
428
429 @property
429 @property
430 def _next_id(self):
430 def _next_id(self):
431 """gemerate a new ID.
431 """gemerate a new ID.
432
432
433 No longer reuse old ids, just count from 0."""
433 No longer reuse old ids, just count from 0."""
434 newid = self._idcounter
434 newid = self._idcounter
435 self._idcounter += 1
435 self._idcounter += 1
436 return newid
436 return newid
437 # newid = 0
437 # newid = 0
438 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
438 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
439 # # print newid, self.ids, self.incoming_registrations
439 # # print newid, self.ids, self.incoming_registrations
440 # while newid in self.ids or newid in incoming:
440 # while newid in self.ids or newid in incoming:
441 # newid += 1
441 # newid += 1
442 # return newid
442 # return newid
443
443
444 #-----------------------------------------------------------------------------
444 #-----------------------------------------------------------------------------
445 # message validation
445 # message validation
446 #-----------------------------------------------------------------------------
446 #-----------------------------------------------------------------------------
447
447
448 def _validate_targets(self, targets):
448 def _validate_targets(self, targets):
449 """turn any valid targets argument into a list of integer ids"""
449 """turn any valid targets argument into a list of integer ids"""
450 if targets is None:
450 if targets is None:
451 # default to all
451 # default to all
452 return self.ids
452 return self.ids
453
453
454 if isinstance(targets, (int,str,unicode)):
454 if isinstance(targets, (int,str,unicode)):
455 # only one target specified
455 # only one target specified
456 targets = [targets]
456 targets = [targets]
457 _targets = []
457 _targets = []
458 for t in targets:
458 for t in targets:
459 # map raw identities to ids
459 # map raw identities to ids
460 if isinstance(t, (str,unicode)):
460 if isinstance(t, (str,unicode)):
461 t = self.by_ident.get(cast_bytes(t), t)
461 t = self.by_ident.get(cast_bytes(t), t)
462 _targets.append(t)
462 _targets.append(t)
463 targets = _targets
463 targets = _targets
464 bad_targets = [ t for t in targets if t not in self.ids ]
464 bad_targets = [ t for t in targets if t not in self.ids ]
465 if bad_targets:
465 if bad_targets:
466 raise IndexError("No Such Engine: %r" % bad_targets)
466 raise IndexError("No Such Engine: %r" % bad_targets)
467 if not targets:
467 if not targets:
468 raise IndexError("No Engines Registered")
468 raise IndexError("No Engines Registered")
469 return targets
469 return targets
470
470
471 #-----------------------------------------------------------------------------
471 #-----------------------------------------------------------------------------
472 # dispatch methods (1 per stream)
472 # dispatch methods (1 per stream)
473 #-----------------------------------------------------------------------------
473 #-----------------------------------------------------------------------------
474
474
475
475
476 @util.log_errors
476 @util.log_errors
477 def dispatch_monitor_traffic(self, msg):
477 def dispatch_monitor_traffic(self, msg):
478 """all ME and Task queue messages come through here, as well as
478 """all ME and Task queue messages come through here, as well as
479 IOPub traffic."""
479 IOPub traffic."""
480 self.log.debug("monitor traffic: %r", msg[0])
480 self.log.debug("monitor traffic: %r", msg[0])
481 switch = msg[0]
481 switch = msg[0]
482 try:
482 try:
483 idents, msg = self.session.feed_identities(msg[1:])
483 idents, msg = self.session.feed_identities(msg[1:])
484 except ValueError:
484 except ValueError:
485 idents=[]
485 idents=[]
486 if not idents:
486 if not idents:
487 self.log.error("Monitor message without topic: %r", msg)
487 self.log.error("Monitor message without topic: %r", msg)
488 return
488 return
489 handler = self.monitor_handlers.get(switch, None)
489 handler = self.monitor_handlers.get(switch, None)
490 if handler is not None:
490 if handler is not None:
491 handler(idents, msg)
491 handler(idents, msg)
492 else:
492 else:
493 self.log.error("Unrecognized monitor topic: %r", switch)
493 self.log.error("Unrecognized monitor topic: %r", switch)
494
494
495
495
496 @util.log_errors
496 @util.log_errors
497 def dispatch_query(self, msg):
497 def dispatch_query(self, msg):
498 """Route registration requests and queries from clients."""
498 """Route registration requests and queries from clients."""
499 try:
499 try:
500 idents, msg = self.session.feed_identities(msg)
500 idents, msg = self.session.feed_identities(msg)
501 except ValueError:
501 except ValueError:
502 idents = []
502 idents = []
503 if not idents:
503 if not idents:
504 self.log.error("Bad Query Message: %r", msg)
504 self.log.error("Bad Query Message: %r", msg)
505 return
505 return
506 client_id = idents[0]
506 client_id = idents[0]
507 try:
507 try:
508 msg = self.session.unserialize(msg, content=True)
508 msg = self.session.unserialize(msg, content=True)
509 except Exception:
509 except Exception:
510 content = error.wrap_exception()
510 content = error.wrap_exception()
511 self.log.error("Bad Query Message: %r", msg, exc_info=True)
511 self.log.error("Bad Query Message: %r", msg, exc_info=True)
512 self.session.send(self.query, "hub_error", ident=client_id,
512 self.session.send(self.query, "hub_error", ident=client_id,
513 content=content)
513 content=content)
514 return
514 return
515 # print client_id, header, parent, content
515 # print client_id, header, parent, content
516 #switch on message type:
516 #switch on message type:
517 msg_type = msg['header']['msg_type']
517 msg_type = msg['header']['msg_type']
518 self.log.info("client::client %r requested %r", client_id, msg_type)
518 self.log.info("client::client %r requested %r", client_id, msg_type)
519 handler = self.query_handlers.get(msg_type, None)
519 handler = self.query_handlers.get(msg_type, None)
520 try:
520 try:
521 assert handler is not None, "Bad Message Type: %r" % msg_type
521 assert handler is not None, "Bad Message Type: %r" % msg_type
522 except:
522 except:
523 content = error.wrap_exception()
523 content = error.wrap_exception()
524 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
524 self.log.error("Bad Message Type: %r", msg_type, exc_info=True)
525 self.session.send(self.query, "hub_error", ident=client_id,
525 self.session.send(self.query, "hub_error", ident=client_id,
526 content=content)
526 content=content)
527 return
527 return
528
528
529 else:
529 else:
530 handler(idents, msg)
530 handler(idents, msg)
531
531
532 def dispatch_db(self, msg):
532 def dispatch_db(self, msg):
533 """"""
533 """"""
534 raise NotImplementedError
534 raise NotImplementedError
535
535
536 #---------------------------------------------------------------------------
536 #---------------------------------------------------------------------------
537 # handler methods (1 per event)
537 # handler methods (1 per event)
538 #---------------------------------------------------------------------------
538 #---------------------------------------------------------------------------
539
539
540 #----------------------- Heartbeat --------------------------------------
540 #----------------------- Heartbeat --------------------------------------
541
541
542 def handle_new_heart(self, heart):
542 def handle_new_heart(self, heart):
543 """handler to attach to heartbeater.
543 """handler to attach to heartbeater.
544 Called when a new heart starts to beat.
544 Called when a new heart starts to beat.
545 Triggers completion of registration."""
545 Triggers completion of registration."""
546 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
546 self.log.debug("heartbeat::handle_new_heart(%r)", heart)
547 if heart not in self.incoming_registrations:
547 if heart not in self.incoming_registrations:
548 self.log.info("heartbeat::ignoring new heart: %r", heart)
548 self.log.info("heartbeat::ignoring new heart: %r", heart)
549 else:
549 else:
550 self.finish_registration(heart)
550 self.finish_registration(heart)
551
551
552
552
553 def handle_heart_failure(self, heart):
553 def handle_heart_failure(self, heart):
554 """handler to attach to heartbeater.
554 """handler to attach to heartbeater.
555 called when a previously registered heart fails to respond to beat request.
555 called when a previously registered heart fails to respond to beat request.
556 triggers unregistration"""
556 triggers unregistration"""
557 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
557 self.log.debug("heartbeat::handle_heart_failure(%r)", heart)
558 eid = self.hearts.get(heart, None)
558 eid = self.hearts.get(heart, None)
559 queue = self.engines[eid].queue
559 queue = self.engines[eid].queue
560 if eid is None or self.keytable[eid] in self.dead_engines:
560 if eid is None or self.keytable[eid] in self.dead_engines:
561 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
561 self.log.info("heartbeat::ignoring heart failure %r (not an engine or already dead)", heart)
562 else:
562 else:
563 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
563 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
564
564
565 #----------------------- MUX Queue Traffic ------------------------------
565 #----------------------- MUX Queue Traffic ------------------------------
566
566
567 def save_queue_request(self, idents, msg):
567 def save_queue_request(self, idents, msg):
568 if len(idents) < 2:
568 if len(idents) < 2:
569 self.log.error("invalid identity prefix: %r", idents)
569 self.log.error("invalid identity prefix: %r", idents)
570 return
570 return
571 queue_id, client_id = idents[:2]
571 queue_id, client_id = idents[:2]
572 try:
572 try:
573 msg = self.session.unserialize(msg)
573 msg = self.session.unserialize(msg)
574 except Exception:
574 except Exception:
575 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
575 self.log.error("queue::client %r sent invalid message to %r: %r", client_id, queue_id, msg, exc_info=True)
576 return
576 return
577
577
578 eid = self.by_ident.get(queue_id, None)
578 eid = self.by_ident.get(queue_id, None)
579 if eid is None:
579 if eid is None:
580 self.log.error("queue::target %r not registered", queue_id)
580 self.log.error("queue::target %r not registered", queue_id)
581 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
581 self.log.debug("queue:: valid are: %r", self.by_ident.keys())
582 return
582 return
583 record = init_record(msg)
583 record = init_record(msg)
584 msg_id = record['msg_id']
584 msg_id = record['msg_id']
585 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
585 self.log.info("queue::client %r submitted request %r to %s", client_id, msg_id, eid)
586 # Unicode in records
586 # Unicode in records
587 record['engine_uuid'] = queue_id.decode('ascii')
587 record['engine_uuid'] = queue_id.decode('ascii')
588 record['client_uuid'] = client_id.decode('ascii')
588 record['client_uuid'] = msg['header']['session']
589 record['queue'] = 'mux'
589 record['queue'] = 'mux'
590
590
591 try:
591 try:
592 # it's posible iopub arrived first:
592 # it's posible iopub arrived first:
593 existing = self.db.get_record(msg_id)
593 existing = self.db.get_record(msg_id)
594 for key,evalue in existing.iteritems():
594 for key,evalue in existing.iteritems():
595 rvalue = record.get(key, None)
595 rvalue = record.get(key, None)
596 if evalue and rvalue and evalue != rvalue:
596 if evalue and rvalue and evalue != rvalue:
597 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
597 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
598 elif evalue and not rvalue:
598 elif evalue and not rvalue:
599 record[key] = evalue
599 record[key] = evalue
600 try:
600 try:
601 self.db.update_record(msg_id, record)
601 self.db.update_record(msg_id, record)
602 except Exception:
602 except Exception:
603 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
603 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
604 except KeyError:
604 except KeyError:
605 try:
605 try:
606 self.db.add_record(msg_id, record)
606 self.db.add_record(msg_id, record)
607 except Exception:
607 except Exception:
608 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
608 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
609
609
610
610
611 self.pending.add(msg_id)
611 self.pending.add(msg_id)
612 self.queues[eid].append(msg_id)
612 self.queues[eid].append(msg_id)
613
613
614 def save_queue_result(self, idents, msg):
614 def save_queue_result(self, idents, msg):
615 if len(idents) < 2:
615 if len(idents) < 2:
616 self.log.error("invalid identity prefix: %r", idents)
616 self.log.error("invalid identity prefix: %r", idents)
617 return
617 return
618
618
619 client_id, queue_id = idents[:2]
619 client_id, queue_id = idents[:2]
620 try:
620 try:
621 msg = self.session.unserialize(msg)
621 msg = self.session.unserialize(msg)
622 except Exception:
622 except Exception:
623 self.log.error("queue::engine %r sent invalid message to %r: %r",
623 self.log.error("queue::engine %r sent invalid message to %r: %r",
624 queue_id, client_id, msg, exc_info=True)
624 queue_id, client_id, msg, exc_info=True)
625 return
625 return
626
626
627 eid = self.by_ident.get(queue_id, None)
627 eid = self.by_ident.get(queue_id, None)
628 if eid is None:
628 if eid is None:
629 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
629 self.log.error("queue::unknown engine %r is sending a reply: ", queue_id)
630 return
630 return
631
631
632 parent = msg['parent_header']
632 parent = msg['parent_header']
633 if not parent:
633 if not parent:
634 return
634 return
635 msg_id = parent['msg_id']
635 msg_id = parent['msg_id']
636 if msg_id in self.pending:
636 if msg_id in self.pending:
637 self.pending.remove(msg_id)
637 self.pending.remove(msg_id)
638 self.all_completed.add(msg_id)
638 self.all_completed.add(msg_id)
639 self.queues[eid].remove(msg_id)
639 self.queues[eid].remove(msg_id)
640 self.completed[eid].append(msg_id)
640 self.completed[eid].append(msg_id)
641 self.log.info("queue::request %r completed on %s", msg_id, eid)
641 self.log.info("queue::request %r completed on %s", msg_id, eid)
642 elif msg_id not in self.all_completed:
642 elif msg_id not in self.all_completed:
643 # it could be a result from a dead engine that died before delivering the
643 # it could be a result from a dead engine that died before delivering the
644 # result
644 # result
645 self.log.warn("queue:: unknown msg finished %r", msg_id)
645 self.log.warn("queue:: unknown msg finished %r", msg_id)
646 return
646 return
647 # update record anyway, because the unregistration could have been premature
647 # update record anyway, because the unregistration could have been premature
648 rheader = msg['header']
648 rheader = msg['header']
649 completed = rheader['date']
649 completed = rheader['date']
650 started = rheader.get('started', None)
650 started = rheader.get('started', None)
651 result = {
651 result = {
652 'result_header' : rheader,
652 'result_header' : rheader,
653 'result_content': msg['content'],
653 'result_content': msg['content'],
654 'received': datetime.now(),
654 'received': datetime.now(),
655 'started' : started,
655 'started' : started,
656 'completed' : completed
656 'completed' : completed
657 }
657 }
658
658
659 result['result_buffers'] = msg['buffers']
659 result['result_buffers'] = msg['buffers']
660 try:
660 try:
661 self.db.update_record(msg_id, result)
661 self.db.update_record(msg_id, result)
662 except Exception:
662 except Exception:
663 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
663 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
664
664
665
665
666 #--------------------- Task Queue Traffic ------------------------------
666 #--------------------- Task Queue Traffic ------------------------------
667
667
668 def save_task_request(self, idents, msg):
668 def save_task_request(self, idents, msg):
669 """Save the submission of a task."""
669 """Save the submission of a task."""
670 client_id = idents[0]
670 client_id = idents[0]
671
671
672 try:
672 try:
673 msg = self.session.unserialize(msg)
673 msg = self.session.unserialize(msg)
674 except Exception:
674 except Exception:
675 self.log.error("task::client %r sent invalid task message: %r",
675 self.log.error("task::client %r sent invalid task message: %r",
676 client_id, msg, exc_info=True)
676 client_id, msg, exc_info=True)
677 return
677 return
678 record = init_record(msg)
678 record = init_record(msg)
679
679
680 record['client_uuid'] = client_id.decode('ascii')
680 record['client_uuid'] = msg['header']['session']
681 record['queue'] = 'task'
681 record['queue'] = 'task'
682 header = msg['header']
682 header = msg['header']
683 msg_id = header['msg_id']
683 msg_id = header['msg_id']
684 self.pending.add(msg_id)
684 self.pending.add(msg_id)
685 self.unassigned.add(msg_id)
685 self.unassigned.add(msg_id)
686 try:
686 try:
687 # it's posible iopub arrived first:
687 # it's posible iopub arrived first:
688 existing = self.db.get_record(msg_id)
688 existing = self.db.get_record(msg_id)
689 if existing['resubmitted']:
689 if existing['resubmitted']:
690 for key in ('submitted', 'client_uuid', 'buffers'):
690 for key in ('submitted', 'client_uuid', 'buffers'):
691 # don't clobber these keys on resubmit
691 # don't clobber these keys on resubmit
692 # submitted and client_uuid should be different
692 # submitted and client_uuid should be different
693 # and buffers might be big, and shouldn't have changed
693 # and buffers might be big, and shouldn't have changed
694 record.pop(key)
694 record.pop(key)
695 # still check content,header which should not change
695 # still check content,header which should not change
696 # but are not expensive to compare as buffers
696 # but are not expensive to compare as buffers
697
697
698 for key,evalue in existing.iteritems():
698 for key,evalue in existing.iteritems():
699 if key.endswith('buffers'):
699 if key.endswith('buffers'):
700 # don't compare buffers
700 # don't compare buffers
701 continue
701 continue
702 rvalue = record.get(key, None)
702 rvalue = record.get(key, None)
703 if evalue and rvalue and evalue != rvalue:
703 if evalue and rvalue and evalue != rvalue:
704 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
704 self.log.warn("conflicting initial state for record: %r:%r <%r> %r", msg_id, rvalue, key, evalue)
705 elif evalue and not rvalue:
705 elif evalue and not rvalue:
706 record[key] = evalue
706 record[key] = evalue
707 try:
707 try:
708 self.db.update_record(msg_id, record)
708 self.db.update_record(msg_id, record)
709 except Exception:
709 except Exception:
710 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
710 self.log.error("DB Error updating record %r", msg_id, exc_info=True)
711 except KeyError:
711 except KeyError:
712 try:
712 try:
713 self.db.add_record(msg_id, record)
713 self.db.add_record(msg_id, record)
714 except Exception:
714 except Exception:
715 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
715 self.log.error("DB Error adding record %r", msg_id, exc_info=True)
716 except Exception:
716 except Exception:
717 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
717 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
718
718
719 def save_task_result(self, idents, msg):
719 def save_task_result(self, idents, msg):
720 """save the result of a completed task."""
720 """save the result of a completed task."""
721 client_id = idents[0]
721 client_id = idents[0]
722 try:
722 try:
723 msg = self.session.unserialize(msg)
723 msg = self.session.unserialize(msg)
724 except Exception:
724 except Exception:
725 self.log.error("task::invalid task result message send to %r: %r",
725 self.log.error("task::invalid task result message send to %r: %r",
726 client_id, msg, exc_info=True)
726 client_id, msg, exc_info=True)
727 return
727 return
728
728
729 parent = msg['parent_header']
729 parent = msg['parent_header']
730 if not parent:
730 if not parent:
731 # print msg
731 # print msg
732 self.log.warn("Task %r had no parent!", msg)
732 self.log.warn("Task %r had no parent!", msg)
733 return
733 return
734 msg_id = parent['msg_id']
734 msg_id = parent['msg_id']
735 if msg_id in self.unassigned:
735 if msg_id in self.unassigned:
736 self.unassigned.remove(msg_id)
736 self.unassigned.remove(msg_id)
737
737
738 header = msg['header']
738 header = msg['header']
739 engine_uuid = header.get('engine', u'')
739 engine_uuid = header.get('engine', u'')
740 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
740 eid = self.by_ident.get(cast_bytes(engine_uuid), None)
741
741
742 status = header.get('status', None)
742 status = header.get('status', None)
743
743
744 if msg_id in self.pending:
744 if msg_id in self.pending:
745 self.log.info("task::task %r finished on %s", msg_id, eid)
745 self.log.info("task::task %r finished on %s", msg_id, eid)
746 self.pending.remove(msg_id)
746 self.pending.remove(msg_id)
747 self.all_completed.add(msg_id)
747 self.all_completed.add(msg_id)
748 if eid is not None:
748 if eid is not None:
749 if status != 'aborted':
749 if status != 'aborted':
750 self.completed[eid].append(msg_id)
750 self.completed[eid].append(msg_id)
751 if msg_id in self.tasks[eid]:
751 if msg_id in self.tasks[eid]:
752 self.tasks[eid].remove(msg_id)
752 self.tasks[eid].remove(msg_id)
753 completed = header['date']
753 completed = header['date']
754 started = header.get('started', None)
754 started = header.get('started', None)
755 result = {
755 result = {
756 'result_header' : header,
756 'result_header' : header,
757 'result_content': msg['content'],
757 'result_content': msg['content'],
758 'started' : started,
758 'started' : started,
759 'completed' : completed,
759 'completed' : completed,
760 'received' : datetime.now(),
760 'received' : datetime.now(),
761 'engine_uuid': engine_uuid,
761 'engine_uuid': engine_uuid,
762 }
762 }
763
763
764 result['result_buffers'] = msg['buffers']
764 result['result_buffers'] = msg['buffers']
765 try:
765 try:
766 self.db.update_record(msg_id, result)
766 self.db.update_record(msg_id, result)
767 except Exception:
767 except Exception:
768 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
768 self.log.error("DB Error saving task request %r", msg_id, exc_info=True)
769
769
770 else:
770 else:
771 self.log.debug("task::unknown task %r finished", msg_id)
771 self.log.debug("task::unknown task %r finished", msg_id)
772
772
773 def save_task_destination(self, idents, msg):
773 def save_task_destination(self, idents, msg):
774 try:
774 try:
775 msg = self.session.unserialize(msg, content=True)
775 msg = self.session.unserialize(msg, content=True)
776 except Exception:
776 except Exception:
777 self.log.error("task::invalid task tracking message", exc_info=True)
777 self.log.error("task::invalid task tracking message", exc_info=True)
778 return
778 return
779 content = msg['content']
779 content = msg['content']
780 # print (content)
780 # print (content)
781 msg_id = content['msg_id']
781 msg_id = content['msg_id']
782 engine_uuid = content['engine_id']
782 engine_uuid = content['engine_id']
783 eid = self.by_ident[cast_bytes(engine_uuid)]
783 eid = self.by_ident[cast_bytes(engine_uuid)]
784
784
785 self.log.info("task::task %r arrived on %r", msg_id, eid)
785 self.log.info("task::task %r arrived on %r", msg_id, eid)
786 if msg_id in self.unassigned:
786 if msg_id in self.unassigned:
787 self.unassigned.remove(msg_id)
787 self.unassigned.remove(msg_id)
788 # else:
788 # else:
789 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
789 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
790
790
791 self.tasks[eid].append(msg_id)
791 self.tasks[eid].append(msg_id)
792 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
792 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
793 try:
793 try:
794 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
794 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
795 except Exception:
795 except Exception:
796 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
796 self.log.error("DB Error saving task destination %r", msg_id, exc_info=True)
797
797
798
798
799 def mia_task_request(self, idents, msg):
799 def mia_task_request(self, idents, msg):
800 raise NotImplementedError
800 raise NotImplementedError
801 client_id = idents[0]
801 client_id = idents[0]
802 # content = dict(mia=self.mia,status='ok')
802 # content = dict(mia=self.mia,status='ok')
803 # self.session.send('mia_reply', content=content, idents=client_id)
803 # self.session.send('mia_reply', content=content, idents=client_id)
804
804
805
805
806 #--------------------- IOPub Traffic ------------------------------
806 #--------------------- IOPub Traffic ------------------------------
807
807
808 def save_iopub_message(self, topics, msg):
808 def save_iopub_message(self, topics, msg):
809 """save an iopub message into the db"""
809 """save an iopub message into the db"""
810 # print (topics)
810 # print (topics)
811 try:
811 try:
812 msg = self.session.unserialize(msg, content=True)
812 msg = self.session.unserialize(msg, content=True)
813 except Exception:
813 except Exception:
814 self.log.error("iopub::invalid IOPub message", exc_info=True)
814 self.log.error("iopub::invalid IOPub message", exc_info=True)
815 return
815 return
816
816
817 parent = msg['parent_header']
817 parent = msg['parent_header']
818 if not parent:
818 if not parent:
819 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
819 self.log.warn("iopub::IOPub message lacks parent: %r", msg)
820 return
820 return
821 msg_id = parent['msg_id']
821 msg_id = parent['msg_id']
822 msg_type = msg['header']['msg_type']
822 msg_type = msg['header']['msg_type']
823 content = msg['content']
823 content = msg['content']
824
824
825 # ensure msg_id is in db
825 # ensure msg_id is in db
826 try:
826 try:
827 rec = self.db.get_record(msg_id)
827 rec = self.db.get_record(msg_id)
828 except KeyError:
828 except KeyError:
829 rec = empty_record()
829 rec = empty_record()
830 rec['msg_id'] = msg_id
830 rec['msg_id'] = msg_id
831 self.db.add_record(msg_id, rec)
831 self.db.add_record(msg_id, rec)
832 # stream
832 # stream
833 d = {}
833 d = {}
834 if msg_type == 'stream':
834 if msg_type == 'stream':
835 name = content['name']
835 name = content['name']
836 s = rec[name] or ''
836 s = rec[name] or ''
837 d[name] = s + content['data']
837 d[name] = s + content['data']
838
838
839 elif msg_type == 'pyerr':
839 elif msg_type == 'pyerr':
840 d['pyerr'] = content
840 d['pyerr'] = content
841 elif msg_type == 'pyin':
841 elif msg_type == 'pyin':
842 d['pyin'] = content['code']
842 d['pyin'] = content['code']
843 elif msg_type in ('display_data', 'pyout'):
843 elif msg_type in ('display_data', 'pyout'):
844 d[msg_type] = content
844 d[msg_type] = content
845 elif msg_type == 'status':
845 elif msg_type == 'status':
846 pass
846 pass
847 else:
847 else:
848 self.log.warn("unhandled iopub msg_type: %r", msg_type)
848 self.log.warn("unhandled iopub msg_type: %r", msg_type)
849
849
850 if not d:
850 if not d:
851 return
851 return
852
852
853 try:
853 try:
854 self.db.update_record(msg_id, d)
854 self.db.update_record(msg_id, d)
855 except Exception:
855 except Exception:
856 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
856 self.log.error("DB Error saving iopub message %r", msg_id, exc_info=True)
857
857
858
858
859
859
860 #-------------------------------------------------------------------------
860 #-------------------------------------------------------------------------
861 # Registration requests
861 # Registration requests
862 #-------------------------------------------------------------------------
862 #-------------------------------------------------------------------------
863
863
864 def connection_request(self, client_id, msg):
864 def connection_request(self, client_id, msg):
865 """Reply with connection addresses for clients."""
865 """Reply with connection addresses for clients."""
866 self.log.info("client::client %r connected", client_id)
866 self.log.info("client::client %r connected", client_id)
867 content = dict(status='ok')
867 content = dict(status='ok')
868 content.update(self.client_info)
868 content.update(self.client_info)
869 jsonable = {}
869 jsonable = {}
870 for k,v in self.keytable.iteritems():
870 for k,v in self.keytable.iteritems():
871 if v not in self.dead_engines:
871 if v not in self.dead_engines:
872 jsonable[str(k)] = v.decode('ascii')
872 jsonable[str(k)] = v.decode('ascii')
873 content['engines'] = jsonable
873 content['engines'] = jsonable
874 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
874 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
875
875
876 def register_engine(self, reg, msg):
876 def register_engine(self, reg, msg):
877 """Register a new engine."""
877 """Register a new engine."""
878 content = msg['content']
878 content = msg['content']
879 try:
879 try:
880 queue = cast_bytes(content['queue'])
880 queue = cast_bytes(content['queue'])
881 except KeyError:
881 except KeyError:
882 self.log.error("registration::queue not specified", exc_info=True)
882 self.log.error("registration::queue not specified", exc_info=True)
883 return
883 return
884 heart = content.get('heartbeat', None)
884 heart = content.get('heartbeat', None)
885 if heart:
885 if heart:
886 heart = cast_bytes(heart)
886 heart = cast_bytes(heart)
887 """register a new engine, and create the socket(s) necessary"""
887 """register a new engine, and create the socket(s) necessary"""
888 eid = self._next_id
888 eid = self._next_id
889 # print (eid, queue, reg, heart)
889 # print (eid, queue, reg, heart)
890
890
891 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
891 self.log.debug("registration::register_engine(%i, %r, %r, %r)", eid, queue, reg, heart)
892
892
893 content = dict(id=eid,status='ok')
893 content = dict(id=eid,status='ok')
894 content.update(self.engine_info)
894 content.update(self.engine_info)
895 # check if requesting available IDs:
895 # check if requesting available IDs:
896 if queue in self.by_ident:
896 if queue in self.by_ident:
897 try:
897 try:
898 raise KeyError("queue_id %r in use" % queue)
898 raise KeyError("queue_id %r in use" % queue)
899 except:
899 except:
900 content = error.wrap_exception()
900 content = error.wrap_exception()
901 self.log.error("queue_id %r in use", queue, exc_info=True)
901 self.log.error("queue_id %r in use", queue, exc_info=True)
902 elif heart in self.hearts: # need to check unique hearts?
902 elif heart in self.hearts: # need to check unique hearts?
903 try:
903 try:
904 raise KeyError("heart_id %r in use" % heart)
904 raise KeyError("heart_id %r in use" % heart)
905 except:
905 except:
906 self.log.error("heart_id %r in use", heart, exc_info=True)
906 self.log.error("heart_id %r in use", heart, exc_info=True)
907 content = error.wrap_exception()
907 content = error.wrap_exception()
908 else:
908 else:
909 for h, pack in self.incoming_registrations.iteritems():
909 for h, pack in self.incoming_registrations.iteritems():
910 if heart == h:
910 if heart == h:
911 try:
911 try:
912 raise KeyError("heart_id %r in use" % heart)
912 raise KeyError("heart_id %r in use" % heart)
913 except:
913 except:
914 self.log.error("heart_id %r in use", heart, exc_info=True)
914 self.log.error("heart_id %r in use", heart, exc_info=True)
915 content = error.wrap_exception()
915 content = error.wrap_exception()
916 break
916 break
917 elif queue == pack[1]:
917 elif queue == pack[1]:
918 try:
918 try:
919 raise KeyError("queue_id %r in use" % queue)
919 raise KeyError("queue_id %r in use" % queue)
920 except:
920 except:
921 self.log.error("queue_id %r in use", queue, exc_info=True)
921 self.log.error("queue_id %r in use", queue, exc_info=True)
922 content = error.wrap_exception()
922 content = error.wrap_exception()
923 break
923 break
924
924
925 msg = self.session.send(self.query, "registration_reply",
925 msg = self.session.send(self.query, "registration_reply",
926 content=content,
926 content=content,
927 ident=reg)
927 ident=reg)
928
928
929 if content['status'] == 'ok':
929 if content['status'] == 'ok':
930 if heart in self.heartmonitor.hearts:
930 if heart in self.heartmonitor.hearts:
931 # already beating
931 # already beating
932 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
932 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
933 self.finish_registration(heart)
933 self.finish_registration(heart)
934 else:
934 else:
935 purge = lambda : self._purge_stalled_registration(heart)
935 purge = lambda : self._purge_stalled_registration(heart)
936 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
936 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
937 dc.start()
937 dc.start()
938 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
938 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
939 else:
939 else:
940 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
940 self.log.error("registration::registration %i failed: %r", eid, content['evalue'])
941 return eid
941 return eid
942
942
943 def unregister_engine(self, ident, msg):
943 def unregister_engine(self, ident, msg):
944 """Unregister an engine that explicitly requested to leave."""
944 """Unregister an engine that explicitly requested to leave."""
945 try:
945 try:
946 eid = msg['content']['id']
946 eid = msg['content']['id']
947 except:
947 except:
948 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
948 self.log.error("registration::bad engine id for unregistration: %r", ident, exc_info=True)
949 return
949 return
950 self.log.info("registration::unregister_engine(%r)", eid)
950 self.log.info("registration::unregister_engine(%r)", eid)
951 # print (eid)
951 # print (eid)
952 uuid = self.keytable[eid]
952 uuid = self.keytable[eid]
953 content=dict(id=eid, queue=uuid.decode('ascii'))
953 content=dict(id=eid, queue=uuid.decode('ascii'))
954 self.dead_engines.add(uuid)
954 self.dead_engines.add(uuid)
955 # self.ids.remove(eid)
955 # self.ids.remove(eid)
956 # uuid = self.keytable.pop(eid)
956 # uuid = self.keytable.pop(eid)
957 #
957 #
958 # ec = self.engines.pop(eid)
958 # ec = self.engines.pop(eid)
959 # self.hearts.pop(ec.heartbeat)
959 # self.hearts.pop(ec.heartbeat)
960 # self.by_ident.pop(ec.queue)
960 # self.by_ident.pop(ec.queue)
961 # self.completed.pop(eid)
961 # self.completed.pop(eid)
962 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
962 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
963 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
963 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
964 dc.start()
964 dc.start()
965 ############## TODO: HANDLE IT ################
965 ############## TODO: HANDLE IT ################
966
966
967 if self.notifier:
967 if self.notifier:
968 self.session.send(self.notifier, "unregistration_notification", content=content)
968 self.session.send(self.notifier, "unregistration_notification", content=content)
969
969
970 def _handle_stranded_msgs(self, eid, uuid):
970 def _handle_stranded_msgs(self, eid, uuid):
971 """Handle messages known to be on an engine when the engine unregisters.
971 """Handle messages known to be on an engine when the engine unregisters.
972
972
973 It is possible that this will fire prematurely - that is, an engine will
973 It is possible that this will fire prematurely - that is, an engine will
974 go down after completing a result, and the client will be notified
974 go down after completing a result, and the client will be notified
975 that the result failed and later receive the actual result.
975 that the result failed and later receive the actual result.
976 """
976 """
977
977
978 outstanding = self.queues[eid]
978 outstanding = self.queues[eid]
979
979
980 for msg_id in outstanding:
980 for msg_id in outstanding:
981 self.pending.remove(msg_id)
981 self.pending.remove(msg_id)
982 self.all_completed.add(msg_id)
982 self.all_completed.add(msg_id)
983 try:
983 try:
984 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
984 raise error.EngineError("Engine %r died while running task %r" % (eid, msg_id))
985 except:
985 except:
986 content = error.wrap_exception()
986 content = error.wrap_exception()
987 # build a fake header:
987 # build a fake header:
988 header = {}
988 header = {}
989 header['engine'] = uuid
989 header['engine'] = uuid
990 header['date'] = datetime.now()
990 header['date'] = datetime.now()
991 rec = dict(result_content=content, result_header=header, result_buffers=[])
991 rec = dict(result_content=content, result_header=header, result_buffers=[])
992 rec['completed'] = header['date']
992 rec['completed'] = header['date']
993 rec['engine_uuid'] = uuid
993 rec['engine_uuid'] = uuid
994 try:
994 try:
995 self.db.update_record(msg_id, rec)
995 self.db.update_record(msg_id, rec)
996 except Exception:
996 except Exception:
997 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
997 self.log.error("DB Error handling stranded msg %r", msg_id, exc_info=True)
998
998
999
999
1000 def finish_registration(self, heart):
1000 def finish_registration(self, heart):
1001 """Second half of engine registration, called after our HeartMonitor
1001 """Second half of engine registration, called after our HeartMonitor
1002 has received a beat from the Engine's Heart."""
1002 has received a beat from the Engine's Heart."""
1003 try:
1003 try:
1004 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
1004 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
1005 except KeyError:
1005 except KeyError:
1006 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1006 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
1007 return
1007 return
1008 self.log.info("registration::finished registering engine %i:%r", eid, queue)
1008 self.log.info("registration::finished registering engine %i:%r", eid, queue)
1009 if purge is not None:
1009 if purge is not None:
1010 purge.stop()
1010 purge.stop()
1011 control = queue
1011 control = queue
1012 self.ids.add(eid)
1012 self.ids.add(eid)
1013 self.keytable[eid] = queue
1013 self.keytable[eid] = queue
1014 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
1014 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
1015 control=control, heartbeat=heart)
1015 control=control, heartbeat=heart)
1016 self.by_ident[queue] = eid
1016 self.by_ident[queue] = eid
1017 self.queues[eid] = list()
1017 self.queues[eid] = list()
1018 self.tasks[eid] = list()
1018 self.tasks[eid] = list()
1019 self.completed[eid] = list()
1019 self.completed[eid] = list()
1020 self.hearts[heart] = eid
1020 self.hearts[heart] = eid
1021 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
1021 content = dict(id=eid, queue=self.engines[eid].queue.decode('ascii'))
1022 if self.notifier:
1022 if self.notifier:
1023 self.session.send(self.notifier, "registration_notification", content=content)
1023 self.session.send(self.notifier, "registration_notification", content=content)
1024 self.log.info("engine::Engine Connected: %i", eid)
1024 self.log.info("engine::Engine Connected: %i", eid)
1025
1025
1026 def _purge_stalled_registration(self, heart):
1026 def _purge_stalled_registration(self, heart):
1027 if heart in self.incoming_registrations:
1027 if heart in self.incoming_registrations:
1028 eid = self.incoming_registrations.pop(heart)[0]
1028 eid = self.incoming_registrations.pop(heart)[0]
1029 self.log.info("registration::purging stalled registration: %i", eid)
1029 self.log.info("registration::purging stalled registration: %i", eid)
1030 else:
1030 else:
1031 pass
1031 pass
1032
1032
1033 #-------------------------------------------------------------------------
1033 #-------------------------------------------------------------------------
1034 # Client Requests
1034 # Client Requests
1035 #-------------------------------------------------------------------------
1035 #-------------------------------------------------------------------------
1036
1036
1037 def shutdown_request(self, client_id, msg):
1037 def shutdown_request(self, client_id, msg):
1038 """handle shutdown request."""
1038 """handle shutdown request."""
1039 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1039 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
1040 # also notify other clients of shutdown
1040 # also notify other clients of shutdown
1041 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1041 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
1042 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1042 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
1043 dc.start()
1043 dc.start()
1044
1044
1045 def _shutdown(self):
1045 def _shutdown(self):
1046 self.log.info("hub::hub shutting down.")
1046 self.log.info("hub::hub shutting down.")
1047 time.sleep(0.1)
1047 time.sleep(0.1)
1048 sys.exit(0)
1048 sys.exit(0)
1049
1049
1050
1050
1051 def check_load(self, client_id, msg):
1051 def check_load(self, client_id, msg):
1052 content = msg['content']
1052 content = msg['content']
1053 try:
1053 try:
1054 targets = content['targets']
1054 targets = content['targets']
1055 targets = self._validate_targets(targets)
1055 targets = self._validate_targets(targets)
1056 except:
1056 except:
1057 content = error.wrap_exception()
1057 content = error.wrap_exception()
1058 self.session.send(self.query, "hub_error",
1058 self.session.send(self.query, "hub_error",
1059 content=content, ident=client_id)
1059 content=content, ident=client_id)
1060 return
1060 return
1061
1061
1062 content = dict(status='ok')
1062 content = dict(status='ok')
1063 # loads = {}
1063 # loads = {}
1064 for t in targets:
1064 for t in targets:
1065 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1065 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1066 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1066 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1067
1067
1068
1068
1069 def queue_status(self, client_id, msg):
1069 def queue_status(self, client_id, msg):
1070 """Return the Queue status of one or more targets.
1070 """Return the Queue status of one or more targets.
1071 if verbose: return the msg_ids
1071 if verbose: return the msg_ids
1072 else: return len of each type.
1072 else: return len of each type.
1073 keys: queue (pending MUX jobs)
1073 keys: queue (pending MUX jobs)
1074 tasks (pending Task jobs)
1074 tasks (pending Task jobs)
1075 completed (finished jobs from both queues)"""
1075 completed (finished jobs from both queues)"""
1076 content = msg['content']
1076 content = msg['content']
1077 targets = content['targets']
1077 targets = content['targets']
1078 try:
1078 try:
1079 targets = self._validate_targets(targets)
1079 targets = self._validate_targets(targets)
1080 except:
1080 except:
1081 content = error.wrap_exception()
1081 content = error.wrap_exception()
1082 self.session.send(self.query, "hub_error",
1082 self.session.send(self.query, "hub_error",
1083 content=content, ident=client_id)
1083 content=content, ident=client_id)
1084 return
1084 return
1085 verbose = content.get('verbose', False)
1085 verbose = content.get('verbose', False)
1086 content = dict(status='ok')
1086 content = dict(status='ok')
1087 for t in targets:
1087 for t in targets:
1088 queue = self.queues[t]
1088 queue = self.queues[t]
1089 completed = self.completed[t]
1089 completed = self.completed[t]
1090 tasks = self.tasks[t]
1090 tasks = self.tasks[t]
1091 if not verbose:
1091 if not verbose:
1092 queue = len(queue)
1092 queue = len(queue)
1093 completed = len(completed)
1093 completed = len(completed)
1094 tasks = len(tasks)
1094 tasks = len(tasks)
1095 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1095 content[str(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1096 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1096 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1097 # print (content)
1097 # print (content)
1098 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1098 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1099
1099
1100 def purge_results(self, client_id, msg):
1100 def purge_results(self, client_id, msg):
1101 """Purge results from memory. This method is more valuable before we move
1101 """Purge results from memory. This method is more valuable before we move
1102 to a DB based message storage mechanism."""
1102 to a DB based message storage mechanism."""
1103 content = msg['content']
1103 content = msg['content']
1104 self.log.info("Dropping records with %s", content)
1104 self.log.info("Dropping records with %s", content)
1105 msg_ids = content.get('msg_ids', [])
1105 msg_ids = content.get('msg_ids', [])
1106 reply = dict(status='ok')
1106 reply = dict(status='ok')
1107 if msg_ids == 'all':
1107 if msg_ids == 'all':
1108 try:
1108 try:
1109 self.db.drop_matching_records(dict(completed={'$ne':None}))
1109 self.db.drop_matching_records(dict(completed={'$ne':None}))
1110 except Exception:
1110 except Exception:
1111 reply = error.wrap_exception()
1111 reply = error.wrap_exception()
1112 else:
1112 else:
1113 pending = filter(lambda m: m in self.pending, msg_ids)
1113 pending = filter(lambda m: m in self.pending, msg_ids)
1114 if pending:
1114 if pending:
1115 try:
1115 try:
1116 raise IndexError("msg pending: %r" % pending[0])
1116 raise IndexError("msg pending: %r" % pending[0])
1117 except:
1117 except:
1118 reply = error.wrap_exception()
1118 reply = error.wrap_exception()
1119 else:
1119 else:
1120 try:
1120 try:
1121 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1121 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1122 except Exception:
1122 except Exception:
1123 reply = error.wrap_exception()
1123 reply = error.wrap_exception()
1124
1124
1125 if reply['status'] == 'ok':
1125 if reply['status'] == 'ok':
1126 eids = content.get('engine_ids', [])
1126 eids = content.get('engine_ids', [])
1127 for eid in eids:
1127 for eid in eids:
1128 if eid not in self.engines:
1128 if eid not in self.engines:
1129 try:
1129 try:
1130 raise IndexError("No such engine: %i" % eid)
1130 raise IndexError("No such engine: %i" % eid)
1131 except:
1131 except:
1132 reply = error.wrap_exception()
1132 reply = error.wrap_exception()
1133 break
1133 break
1134 uid = self.engines[eid].queue
1134 uid = self.engines[eid].queue
1135 try:
1135 try:
1136 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1136 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1137 except Exception:
1137 except Exception:
1138 reply = error.wrap_exception()
1138 reply = error.wrap_exception()
1139 break
1139 break
1140
1140
1141 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1141 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1142
1142
1143 def resubmit_task(self, client_id, msg):
1143 def resubmit_task(self, client_id, msg):
1144 """Resubmit one or more tasks."""
1144 """Resubmit one or more tasks."""
1145 def finish(reply):
1145 def finish(reply):
1146 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1146 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1147
1147
1148 content = msg['content']
1148 content = msg['content']
1149 msg_ids = content['msg_ids']
1149 msg_ids = content['msg_ids']
1150 reply = dict(status='ok')
1150 reply = dict(status='ok')
1151 try:
1151 try:
1152 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1152 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1153 'header', 'content', 'buffers'])
1153 'header', 'content', 'buffers'])
1154 except Exception:
1154 except Exception:
1155 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1155 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1156 return finish(error.wrap_exception())
1156 return finish(error.wrap_exception())
1157
1157
1158 # validate msg_ids
1158 # validate msg_ids
1159 found_ids = [ rec['msg_id'] for rec in records ]
1159 found_ids = [ rec['msg_id'] for rec in records ]
1160 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1160 pending_ids = [ msg_id for msg_id in found_ids if msg_id in self.pending ]
1161 if len(records) > len(msg_ids):
1161 if len(records) > len(msg_ids):
1162 try:
1162 try:
1163 raise RuntimeError("DB appears to be in an inconsistent state."
1163 raise RuntimeError("DB appears to be in an inconsistent state."
1164 "More matching records were found than should exist")
1164 "More matching records were found than should exist")
1165 except Exception:
1165 except Exception:
1166 return finish(error.wrap_exception())
1166 return finish(error.wrap_exception())
1167 elif len(records) < len(msg_ids):
1167 elif len(records) < len(msg_ids):
1168 missing = [ m for m in msg_ids if m not in found_ids ]
1168 missing = [ m for m in msg_ids if m not in found_ids ]
1169 try:
1169 try:
1170 raise KeyError("No such msg(s): %r" % missing)
1170 raise KeyError("No such msg(s): %r" % missing)
1171 except KeyError:
1171 except KeyError:
1172 return finish(error.wrap_exception())
1172 return finish(error.wrap_exception())
1173 elif pending_ids:
1173 elif pending_ids:
1174 pass
1174 pass
1175 # no need to raise on resubmit of pending task, now that we
1175 # no need to raise on resubmit of pending task, now that we
1176 # resubmit under new ID, but do we want to raise anyway?
1176 # resubmit under new ID, but do we want to raise anyway?
1177 # msg_id = invalid_ids[0]
1177 # msg_id = invalid_ids[0]
1178 # try:
1178 # try:
1179 # raise ValueError("Task(s) %r appears to be inflight" % )
1179 # raise ValueError("Task(s) %r appears to be inflight" % )
1180 # except Exception:
1180 # except Exception:
1181 # return finish(error.wrap_exception())
1181 # return finish(error.wrap_exception())
1182
1182
1183 # mapping of original IDs to resubmitted IDs
1183 # mapping of original IDs to resubmitted IDs
1184 resubmitted = {}
1184 resubmitted = {}
1185
1185
1186 # send the messages
1186 # send the messages
1187 for rec in records:
1187 for rec in records:
1188 header = rec['header']
1188 header = rec['header']
1189 msg = self.session.msg(header['msg_type'], parent=header)
1189 msg = self.session.msg(header['msg_type'], parent=header)
1190 msg_id = msg['msg_id']
1190 msg_id = msg['msg_id']
1191 msg['content'] = rec['content']
1191 msg['content'] = rec['content']
1192
1192
1193 # use the old header, but update msg_id and timestamp
1193 # use the old header, but update msg_id and timestamp
1194 fresh = msg['header']
1194 fresh = msg['header']
1195 header['msg_id'] = fresh['msg_id']
1195 header['msg_id'] = fresh['msg_id']
1196 header['date'] = fresh['date']
1196 header['date'] = fresh['date']
1197 msg['header'] = header
1197 msg['header'] = header
1198
1198
1199 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1199 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1200
1200
1201 resubmitted[rec['msg_id']] = msg_id
1201 resubmitted[rec['msg_id']] = msg_id
1202 self.pending.add(msg_id)
1202 self.pending.add(msg_id)
1203 msg['buffers'] = rec['buffers']
1203 msg['buffers'] = rec['buffers']
1204 try:
1204 try:
1205 self.db.add_record(msg_id, init_record(msg))
1205 self.db.add_record(msg_id, init_record(msg))
1206 except Exception:
1206 except Exception:
1207 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1207 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1208 return finish(error.wrap_exception())
1208
1209
1209 finish(dict(status='ok', resubmitted=resubmitted))
1210 finish(dict(status='ok', resubmitted=resubmitted))
1210
1211
1211 # store the new IDs in the Task DB
1212 # store the new IDs in the Task DB
1212 for msg_id, resubmit_id in resubmitted.iteritems():
1213 for msg_id, resubmit_id in resubmitted.iteritems():
1213 try:
1214 try:
1214 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1215 self.db.update_record(msg_id, {'resubmitted' : resubmit_id})
1215 except Exception:
1216 except Exception:
1216 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1217 self.log.error("db::DB Error updating record: %s", msg_id, exc_info=True)
1217
1218
1218
1219
1219 def _extract_record(self, rec):
1220 def _extract_record(self, rec):
1220 """decompose a TaskRecord dict into subsection of reply for get_result"""
1221 """decompose a TaskRecord dict into subsection of reply for get_result"""
1221 io_dict = {}
1222 io_dict = {}
1222 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1223 for key in ('pyin', 'pyout', 'pyerr', 'stdout', 'stderr'):
1223 io_dict[key] = rec[key]
1224 io_dict[key] = rec[key]
1224 content = { 'result_content': rec['result_content'],
1225 content = { 'result_content': rec['result_content'],
1225 'header': rec['header'],
1226 'header': rec['header'],
1226 'result_header' : rec['result_header'],
1227 'result_header' : rec['result_header'],
1227 'received' : rec['received'],
1228 'received' : rec['received'],
1228 'io' : io_dict,
1229 'io' : io_dict,
1229 }
1230 }
1230 if rec['result_buffers']:
1231 if rec['result_buffers']:
1231 buffers = map(bytes, rec['result_buffers'])
1232 buffers = map(bytes, rec['result_buffers'])
1232 else:
1233 else:
1233 buffers = []
1234 buffers = []
1234
1235
1235 return content, buffers
1236 return content, buffers
1236
1237
1237 def get_results(self, client_id, msg):
1238 def get_results(self, client_id, msg):
1238 """Get the result of 1 or more messages."""
1239 """Get the result of 1 or more messages."""
1239 content = msg['content']
1240 content = msg['content']
1240 msg_ids = sorted(set(content['msg_ids']))
1241 msg_ids = sorted(set(content['msg_ids']))
1241 statusonly = content.get('status_only', False)
1242 statusonly = content.get('status_only', False)
1242 pending = []
1243 pending = []
1243 completed = []
1244 completed = []
1244 content = dict(status='ok')
1245 content = dict(status='ok')
1245 content['pending'] = pending
1246 content['pending'] = pending
1246 content['completed'] = completed
1247 content['completed'] = completed
1247 buffers = []
1248 buffers = []
1248 if not statusonly:
1249 if not statusonly:
1249 try:
1250 try:
1250 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1251 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1251 # turn match list into dict, for faster lookup
1252 # turn match list into dict, for faster lookup
1252 records = {}
1253 records = {}
1253 for rec in matches:
1254 for rec in matches:
1254 records[rec['msg_id']] = rec
1255 records[rec['msg_id']] = rec
1255 except Exception:
1256 except Exception:
1256 content = error.wrap_exception()
1257 content = error.wrap_exception()
1257 self.session.send(self.query, "result_reply", content=content,
1258 self.session.send(self.query, "result_reply", content=content,
1258 parent=msg, ident=client_id)
1259 parent=msg, ident=client_id)
1259 return
1260 return
1260 else:
1261 else:
1261 records = {}
1262 records = {}
1262 for msg_id in msg_ids:
1263 for msg_id in msg_ids:
1263 if msg_id in self.pending:
1264 if msg_id in self.pending:
1264 pending.append(msg_id)
1265 pending.append(msg_id)
1265 elif msg_id in self.all_completed:
1266 elif msg_id in self.all_completed:
1266 completed.append(msg_id)
1267 completed.append(msg_id)
1267 if not statusonly:
1268 if not statusonly:
1268 c,bufs = self._extract_record(records[msg_id])
1269 c,bufs = self._extract_record(records[msg_id])
1269 content[msg_id] = c
1270 content[msg_id] = c
1270 buffers.extend(bufs)
1271 buffers.extend(bufs)
1271 elif msg_id in records:
1272 elif msg_id in records:
1272 if rec['completed']:
1273 if rec['completed']:
1273 completed.append(msg_id)
1274 completed.append(msg_id)
1274 c,bufs = self._extract_record(records[msg_id])
1275 c,bufs = self._extract_record(records[msg_id])
1275 content[msg_id] = c
1276 content[msg_id] = c
1276 buffers.extend(bufs)
1277 buffers.extend(bufs)
1277 else:
1278 else:
1278 pending.append(msg_id)
1279 pending.append(msg_id)
1279 else:
1280 else:
1280 try:
1281 try:
1281 raise KeyError('No such message: '+msg_id)
1282 raise KeyError('No such message: '+msg_id)
1282 except:
1283 except:
1283 content = error.wrap_exception()
1284 content = error.wrap_exception()
1284 break
1285 break
1285 self.session.send(self.query, "result_reply", content=content,
1286 self.session.send(self.query, "result_reply", content=content,
1286 parent=msg, ident=client_id,
1287 parent=msg, ident=client_id,
1287 buffers=buffers)
1288 buffers=buffers)
1288
1289
1289 def get_history(self, client_id, msg):
1290 def get_history(self, client_id, msg):
1290 """Get a list of all msg_ids in our DB records"""
1291 """Get a list of all msg_ids in our DB records"""
1291 try:
1292 try:
1292 msg_ids = self.db.get_history()
1293 msg_ids = self.db.get_history()
1293 except Exception as e:
1294 except Exception as e:
1294 content = error.wrap_exception()
1295 content = error.wrap_exception()
1295 else:
1296 else:
1296 content = dict(status='ok', history=msg_ids)
1297 content = dict(status='ok', history=msg_ids)
1297
1298
1298 self.session.send(self.query, "history_reply", content=content,
1299 self.session.send(self.query, "history_reply", content=content,
1299 parent=msg, ident=client_id)
1300 parent=msg, ident=client_id)
1300
1301
1301 def db_query(self, client_id, msg):
1302 def db_query(self, client_id, msg):
1302 """Perform a raw query on the task record database."""
1303 """Perform a raw query on the task record database."""
1303 content = msg['content']
1304 content = msg['content']
1304 query = content.get('query', {})
1305 query = content.get('query', {})
1305 keys = content.get('keys', None)
1306 keys = content.get('keys', None)
1306 buffers = []
1307 buffers = []
1307 empty = list()
1308 empty = list()
1308 try:
1309 try:
1309 records = self.db.find_records(query, keys)
1310 records = self.db.find_records(query, keys)
1310 except Exception as e:
1311 except Exception as e:
1311 content = error.wrap_exception()
1312 content = error.wrap_exception()
1312 else:
1313 else:
1313 # extract buffers from reply content:
1314 # extract buffers from reply content:
1314 if keys is not None:
1315 if keys is not None:
1315 buffer_lens = [] if 'buffers' in keys else None
1316 buffer_lens = [] if 'buffers' in keys else None
1316 result_buffer_lens = [] if 'result_buffers' in keys else None
1317 result_buffer_lens = [] if 'result_buffers' in keys else None
1317 else:
1318 else:
1318 buffer_lens = None
1319 buffer_lens = None
1319 result_buffer_lens = None
1320 result_buffer_lens = None
1320
1321
1321 for rec in records:
1322 for rec in records:
1322 # buffers may be None, so double check
1323 # buffers may be None, so double check
1323 b = rec.pop('buffers', empty) or empty
1324 b = rec.pop('buffers', empty) or empty
1324 if buffer_lens is not None:
1325 if buffer_lens is not None:
1325 buffer_lens.append(len(b))
1326 buffer_lens.append(len(b))
1326 buffers.extend(b)
1327 buffers.extend(b)
1327 rb = rec.pop('result_buffers', empty) or empty
1328 rb = rec.pop('result_buffers', empty) or empty
1328 if result_buffer_lens is not None:
1329 if result_buffer_lens is not None:
1329 result_buffer_lens.append(len(rb))
1330 result_buffer_lens.append(len(rb))
1330 buffers.extend(rb)
1331 buffers.extend(rb)
1331 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1332 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1332 result_buffer_lens=result_buffer_lens)
1333 result_buffer_lens=result_buffer_lens)
1333 # self.log.debug (content)
1334 # self.log.debug (content)
1334 self.session.send(self.query, "db_reply", content=content,
1335 self.session.send(self.query, "db_reply", content=content,
1335 parent=msg, ident=client_id,
1336 parent=msg, ident=client_id,
1336 buffers=buffers)
1337 buffers=buffers)
1337
1338
General Comments 0
You need to be logged in to leave comments. Login now