##// END OF EJS Templates
remove all PAIR sockets, Merge registration+query
MinRK -
Show More
@@ -1,1569 +1,1570 b''
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import json
14 import json
15 import time
15 import time
16 import warnings
16 import warnings
17 from datetime import datetime
17 from datetime import datetime
18 from getpass import getpass
18 from getpass import getpass
19 from pprint import pprint
19 from pprint import pprint
20
20
21 pjoin = os.path.join
21 pjoin = os.path.join
22
22
23 import zmq
23 import zmq
24 # from zmq.eventloop import ioloop, zmqstream
24 # from zmq.eventloop import ioloop, zmqstream
25
25
26 from IPython.utils.path import get_ipython_dir
26 from IPython.utils.path import get_ipython_dir
27 from IPython.utils.pickleutil import Reference
27 from IPython.utils.pickleutil import Reference
28 from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode,
28 from IPython.utils.traitlets import (HasTraits, Int, Instance, CUnicode,
29 Dict, List, Bool, Str, Set)
29 Dict, List, Bool, Str, Set)
30 from IPython.external.decorator import decorator
30 from IPython.external.decorator import decorator
31 from IPython.external.ssh import tunnel
31 from IPython.external.ssh import tunnel
32
32
33 from . import error
33 from . import error
34 from . import map as Map
34 from . import map as Map
35 from . import util
35 from . import util
36 from . import streamsession as ss
36 from . import streamsession as ss
37 from .asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult
37 from .asyncresult import AsyncResult, AsyncMapResult, AsyncHubResult
38 from .clusterdir import ClusterDir, ClusterDirError
38 from .clusterdir import ClusterDir, ClusterDirError
39 from .dependency import Dependency, depend, require, dependent
39 from .dependency import Dependency, depend, require, dependent
40 from .remotefunction import remote, parallel, ParallelFunction, RemoteFunction
40 from .remotefunction import remote, parallel, ParallelFunction, RemoteFunction
41 from .util import ReverseDict, validate_url, disambiguate_url
41 from .util import ReverseDict, validate_url, disambiguate_url
42 from .view import DirectView, LoadBalancedView
42 from .view import DirectView, LoadBalancedView
43
43
44 #--------------------------------------------------------------------------
44 #--------------------------------------------------------------------------
45 # helpers for implementing old MEC API via client.apply
45 # helpers for implementing old MEC API via client.apply
46 #--------------------------------------------------------------------------
46 #--------------------------------------------------------------------------
47
47
48 def _push(user_ns, **ns):
48 def _push(user_ns, **ns):
49 """helper method for implementing `client.push` via `client.apply`"""
49 """helper method for implementing `client.push` via `client.apply`"""
50 user_ns.update(ns)
50 user_ns.update(ns)
51
51
52 def _pull(user_ns, keys):
52 def _pull(user_ns, keys):
53 """helper method for implementing `client.pull` via `client.apply`"""
53 """helper method for implementing `client.pull` via `client.apply`"""
54 if isinstance(keys, (list,tuple, set)):
54 if isinstance(keys, (list,tuple, set)):
55 for key in keys:
55 for key in keys:
56 if not user_ns.has_key(key):
56 if not user_ns.has_key(key):
57 raise NameError("name '%s' is not defined"%key)
57 raise NameError("name '%s' is not defined"%key)
58 return map(user_ns.get, keys)
58 return map(user_ns.get, keys)
59 else:
59 else:
60 if not user_ns.has_key(keys):
60 if not user_ns.has_key(keys):
61 raise NameError("name '%s' is not defined"%keys)
61 raise NameError("name '%s' is not defined"%keys)
62 return user_ns.get(keys)
62 return user_ns.get(keys)
63
63
64 def _clear(user_ns):
64 def _clear(user_ns):
65 """helper method for implementing `client.clear` via `client.apply`"""
65 """helper method for implementing `client.clear` via `client.apply`"""
66 user_ns.clear()
66 user_ns.clear()
67
67
68 def _execute(user_ns, code):
68 def _execute(user_ns, code):
69 """helper method for implementing `client.execute` via `client.apply`"""
69 """helper method for implementing `client.execute` via `client.apply`"""
70 exec code in user_ns
70 exec code in user_ns
71
71
72
72
73 #--------------------------------------------------------------------------
73 #--------------------------------------------------------------------------
74 # Decorators for Client methods
74 # Decorators for Client methods
75 #--------------------------------------------------------------------------
75 #--------------------------------------------------------------------------
76
76
77 @decorator
77 @decorator
78 def spinfirst(f, self, *args, **kwargs):
78 def spinfirst(f, self, *args, **kwargs):
79 """Call spin() to sync state prior to calling the method."""
79 """Call spin() to sync state prior to calling the method."""
80 self.spin()
80 self.spin()
81 return f(self, *args, **kwargs)
81 return f(self, *args, **kwargs)
82
82
83 @decorator
83 @decorator
84 def defaultblock(f, self, *args, **kwargs):
84 def defaultblock(f, self, *args, **kwargs):
85 """Default to self.block; preserve self.block."""
85 """Default to self.block; preserve self.block."""
86 block = kwargs.get('block',None)
86 block = kwargs.get('block',None)
87 block = self.block if block is None else block
87 block = self.block if block is None else block
88 saveblock = self.block
88 saveblock = self.block
89 self.block = block
89 self.block = block
90 try:
90 try:
91 ret = f(self, *args, **kwargs)
91 ret = f(self, *args, **kwargs)
92 finally:
92 finally:
93 self.block = saveblock
93 self.block = saveblock
94 return ret
94 return ret
95
95
96
96
97 #--------------------------------------------------------------------------
97 #--------------------------------------------------------------------------
98 # Classes
98 # Classes
99 #--------------------------------------------------------------------------
99 #--------------------------------------------------------------------------
100
100
101 class Metadata(dict):
101 class Metadata(dict):
102 """Subclass of dict for initializing metadata values.
102 """Subclass of dict for initializing metadata values.
103
103
104 Attribute access works on keys.
104 Attribute access works on keys.
105
105
106 These objects have a strict set of keys - errors will raise if you try
106 These objects have a strict set of keys - errors will raise if you try
107 to add new keys.
107 to add new keys.
108 """
108 """
109 def __init__(self, *args, **kwargs):
109 def __init__(self, *args, **kwargs):
110 dict.__init__(self)
110 dict.__init__(self)
111 md = {'msg_id' : None,
111 md = {'msg_id' : None,
112 'submitted' : None,
112 'submitted' : None,
113 'started' : None,
113 'started' : None,
114 'completed' : None,
114 'completed' : None,
115 'received' : None,
115 'received' : None,
116 'engine_uuid' : None,
116 'engine_uuid' : None,
117 'engine_id' : None,
117 'engine_id' : None,
118 'follow' : None,
118 'follow' : None,
119 'after' : None,
119 'after' : None,
120 'status' : None,
120 'status' : None,
121
121
122 'pyin' : None,
122 'pyin' : None,
123 'pyout' : None,
123 'pyout' : None,
124 'pyerr' : None,
124 'pyerr' : None,
125 'stdout' : '',
125 'stdout' : '',
126 'stderr' : '',
126 'stderr' : '',
127 }
127 }
128 self.update(md)
128 self.update(md)
129 self.update(dict(*args, **kwargs))
129 self.update(dict(*args, **kwargs))
130
130
131 def __getattr__(self, key):
131 def __getattr__(self, key):
132 """getattr aliased to getitem"""
132 """getattr aliased to getitem"""
133 if key in self.iterkeys():
133 if key in self.iterkeys():
134 return self[key]
134 return self[key]
135 else:
135 else:
136 raise AttributeError(key)
136 raise AttributeError(key)
137
137
138 def __setattr__(self, key, value):
138 def __setattr__(self, key, value):
139 """setattr aliased to setitem, with strict"""
139 """setattr aliased to setitem, with strict"""
140 if key in self.iterkeys():
140 if key in self.iterkeys():
141 self[key] = value
141 self[key] = value
142 else:
142 else:
143 raise AttributeError(key)
143 raise AttributeError(key)
144
144
145 def __setitem__(self, key, value):
145 def __setitem__(self, key, value):
146 """strict static key enforcement"""
146 """strict static key enforcement"""
147 if key in self.iterkeys():
147 if key in self.iterkeys():
148 dict.__setitem__(self, key, value)
148 dict.__setitem__(self, key, value)
149 else:
149 else:
150 raise KeyError(key)
150 raise KeyError(key)
151
151
152
152
153 class Client(HasTraits):
153 class Client(HasTraits):
154 """A semi-synchronous client to the IPython ZMQ controller
154 """A semi-synchronous client to the IPython ZMQ controller
155
155
156 Parameters
156 Parameters
157 ----------
157 ----------
158
158
159 url_or_file : bytes; zmq url or path to ipcontroller-client.json
159 url_or_file : bytes; zmq url or path to ipcontroller-client.json
160 Connection information for the Hub's registration. If a json connector
160 Connection information for the Hub's registration. If a json connector
161 file is given, then likely no further configuration is necessary.
161 file is given, then likely no further configuration is necessary.
162 [Default: use profile]
162 [Default: use profile]
163 profile : bytes
163 profile : bytes
164 The name of the Cluster profile to be used to find connector information.
164 The name of the Cluster profile to be used to find connector information.
165 [Default: 'default']
165 [Default: 'default']
166 context : zmq.Context
166 context : zmq.Context
167 Pass an existing zmq.Context instance, otherwise the client will create its own.
167 Pass an existing zmq.Context instance, otherwise the client will create its own.
168 username : bytes
168 username : bytes
169 set username to be passed to the Session object
169 set username to be passed to the Session object
170 debug : bool
170 debug : bool
171 flag for lots of message printing for debug purposes
171 flag for lots of message printing for debug purposes
172
172
173 #-------------- ssh related args ----------------
173 #-------------- ssh related args ----------------
174 # These are args for configuring the ssh tunnel to be used
174 # These are args for configuring the ssh tunnel to be used
175 # credentials are used to forward connections over ssh to the Controller
175 # credentials are used to forward connections over ssh to the Controller
176 # Note that the ip given in `addr` needs to be relative to sshserver
176 # Note that the ip given in `addr` needs to be relative to sshserver
177 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
177 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
178 # and set sshserver as the same machine the Controller is on. However,
178 # and set sshserver as the same machine the Controller is on. However,
179 # the only requirement is that sshserver is able to see the Controller
179 # the only requirement is that sshserver is able to see the Controller
180 # (i.e. is within the same trusted network).
180 # (i.e. is within the same trusted network).
181
181
182 sshserver : str
182 sshserver : str
183 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
183 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
184 If keyfile or password is specified, and this is not, it will default to
184 If keyfile or password is specified, and this is not, it will default to
185 the ip given in addr.
185 the ip given in addr.
186 sshkey : str; path to public ssh key file
186 sshkey : str; path to public ssh key file
187 This specifies a key to be used in ssh login, default None.
187 This specifies a key to be used in ssh login, default None.
188 Regular default ssh keys will be used without specifying this argument.
188 Regular default ssh keys will be used without specifying this argument.
189 password : str
189 password : str
190 Your ssh password to sshserver. Note that if this is left None,
190 Your ssh password to sshserver. Note that if this is left None,
191 you will be prompted for it if passwordless key based login is unavailable.
191 you will be prompted for it if passwordless key based login is unavailable.
192 paramiko : bool
192 paramiko : bool
193 flag for whether to use paramiko instead of shell ssh for tunneling.
193 flag for whether to use paramiko instead of shell ssh for tunneling.
194 [default: True on win32, False else]
194 [default: True on win32, False else]
195
195
196 #------- exec authentication args -------
196 #------- exec authentication args -------
197 # If even localhost is untrusted, you can have some protection against
197 # If even localhost is untrusted, you can have some protection against
198 # unauthorized execution by using a key. Messages are still sent
198 # unauthorized execution by using a key. Messages are still sent
199 # as cleartext, so if someone can snoop your loopback traffic this will
199 # as cleartext, so if someone can snoop your loopback traffic this will
200 # not help against malicious attacks.
200 # not help against malicious attacks.
201
201
202 exec_key : str
202 exec_key : str
203 an authentication key or file containing a key
203 an authentication key or file containing a key
204 default: None
204 default: None
205
205
206
206
207 Attributes
207 Attributes
208 ----------
208 ----------
209
209
210 ids : set of int engine IDs
210 ids : set of int engine IDs
211 requesting the ids attribute always synchronizes
211 requesting the ids attribute always synchronizes
212 the registration state. To request ids without synchronization,
212 the registration state. To request ids without synchronization,
213 use semi-private _ids attributes.
213 use semi-private _ids attributes.
214
214
215 history : list of msg_ids
215 history : list of msg_ids
216 a list of msg_ids, keeping track of all the execution
216 a list of msg_ids, keeping track of all the execution
217 messages you have submitted in order.
217 messages you have submitted in order.
218
218
219 outstanding : set of msg_ids
219 outstanding : set of msg_ids
220 a set of msg_ids that have been submitted, but whose
220 a set of msg_ids that have been submitted, but whose
221 results have not yet been received.
221 results have not yet been received.
222
222
223 results : dict
223 results : dict
224 a dict of all our results, keyed by msg_id
224 a dict of all our results, keyed by msg_id
225
225
226 block : bool
226 block : bool
227 determines default behavior when block not specified
227 determines default behavior when block not specified
228 in execution methods
228 in execution methods
229
229
230 Methods
230 Methods
231 -------
231 -------
232
232
233 spin
233 spin
234 flushes incoming results and registration state changes
234 flushes incoming results and registration state changes
235 control methods spin, and requesting `ids` also ensures up to date
235 control methods spin, and requesting `ids` also ensures up to date
236
236
237 barrier
237 barrier
238 wait on one or more msg_ids
238 wait on one or more msg_ids
239
239
240 execution methods
240 execution methods
241 apply
241 apply
242 legacy: execute, run
242 legacy: execute, run
243
243
244 query methods
244 query methods
245 queue_status, get_result, purge
245 queue_status, get_result, purge
246
246
247 control methods
247 control methods
248 abort, shutdown
248 abort, shutdown
249
249
250 """
250 """
251
251
252
252
253 block = Bool(False)
253 block = Bool(False)
254 outstanding = Set()
254 outstanding = Set()
255 results = Instance('collections.defaultdict', (dict,))
255 results = Instance('collections.defaultdict', (dict,))
256 metadata = Instance('collections.defaultdict', (Metadata,))
256 metadata = Instance('collections.defaultdict', (Metadata,))
257 history = List()
257 history = List()
258 debug = Bool(False)
258 debug = Bool(False)
259 profile=CUnicode('default')
259 profile=CUnicode('default')
260
260
261 _outstanding_dict = Instance('collections.defaultdict', (set,))
261 _outstanding_dict = Instance('collections.defaultdict', (set,))
262 _ids = List()
262 _ids = List()
263 _connected=Bool(False)
263 _connected=Bool(False)
264 _ssh=Bool(False)
264 _ssh=Bool(False)
265 _context = Instance('zmq.Context')
265 _context = Instance('zmq.Context')
266 _config = Dict()
266 _config = Dict()
267 _engines=Instance(ReverseDict, (), {})
267 _engines=Instance(ReverseDict, (), {})
268 _registration_socket=Instance('zmq.Socket')
268 # _hub_socket=Instance('zmq.Socket')
269 _query_socket=Instance('zmq.Socket')
269 _query_socket=Instance('zmq.Socket')
270 _control_socket=Instance('zmq.Socket')
270 _control_socket=Instance('zmq.Socket')
271 _iopub_socket=Instance('zmq.Socket')
271 _iopub_socket=Instance('zmq.Socket')
272 _notification_socket=Instance('zmq.Socket')
272 _notification_socket=Instance('zmq.Socket')
273 _mux_socket=Instance('zmq.Socket')
273 _mux_socket=Instance('zmq.Socket')
274 _task_socket=Instance('zmq.Socket')
274 _task_socket=Instance('zmq.Socket')
275 _task_scheme=Str()
275 _task_scheme=Str()
276 _balanced_views=Dict()
276 _balanced_views=Dict()
277 _direct_views=Dict()
277 _direct_views=Dict()
278 _closed = False
278 _closed = False
279
279
280 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
280 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
281 context=None, username=None, debug=False, exec_key=None,
281 context=None, username=None, debug=False, exec_key=None,
282 sshserver=None, sshkey=None, password=None, paramiko=None,
282 sshserver=None, sshkey=None, password=None, paramiko=None,
283 ):
283 ):
284 super(Client, self).__init__(debug=debug, profile=profile)
284 super(Client, self).__init__(debug=debug, profile=profile)
285 if context is None:
285 if context is None:
286 context = zmq.Context()
286 context = zmq.Context()
287 self._context = context
287 self._context = context
288
288
289
289
290 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
290 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
291 if self._cd is not None:
291 if self._cd is not None:
292 if url_or_file is None:
292 if url_or_file is None:
293 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
293 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
294 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
294 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
295 " Please specify at least one of url_or_file or profile."
295 " Please specify at least one of url_or_file or profile."
296
296
297 try:
297 try:
298 validate_url(url_or_file)
298 validate_url(url_or_file)
299 except AssertionError:
299 except AssertionError:
300 if not os.path.exists(url_or_file):
300 if not os.path.exists(url_or_file):
301 if self._cd:
301 if self._cd:
302 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
302 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
303 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
303 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
304 with open(url_or_file) as f:
304 with open(url_or_file) as f:
305 cfg = json.loads(f.read())
305 cfg = json.loads(f.read())
306 else:
306 else:
307 cfg = {'url':url_or_file}
307 cfg = {'url':url_or_file}
308
308
309 # sync defaults from args, json:
309 # sync defaults from args, json:
310 if sshserver:
310 if sshserver:
311 cfg['ssh'] = sshserver
311 cfg['ssh'] = sshserver
312 if exec_key:
312 if exec_key:
313 cfg['exec_key'] = exec_key
313 cfg['exec_key'] = exec_key
314 exec_key = cfg['exec_key']
314 exec_key = cfg['exec_key']
315 sshserver=cfg['ssh']
315 sshserver=cfg['ssh']
316 url = cfg['url']
316 url = cfg['url']
317 location = cfg.setdefault('location', None)
317 location = cfg.setdefault('location', None)
318 cfg['url'] = disambiguate_url(cfg['url'], location)
318 cfg['url'] = disambiguate_url(cfg['url'], location)
319 url = cfg['url']
319 url = cfg['url']
320
320
321 self._config = cfg
321 self._config = cfg
322
322
323 self._ssh = bool(sshserver or sshkey or password)
323 self._ssh = bool(sshserver or sshkey or password)
324 if self._ssh and sshserver is None:
324 if self._ssh and sshserver is None:
325 # default to ssh via localhost
325 # default to ssh via localhost
326 sshserver = url.split('://')[1].split(':')[0]
326 sshserver = url.split('://')[1].split(':')[0]
327 if self._ssh and password is None:
327 if self._ssh and password is None:
328 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
328 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
329 password=False
329 password=False
330 else:
330 else:
331 password = getpass("SSH Password for %s: "%sshserver)
331 password = getpass("SSH Password for %s: "%sshserver)
332 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
332 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
333 if exec_key is not None and os.path.isfile(exec_key):
333 if exec_key is not None and os.path.isfile(exec_key):
334 arg = 'keyfile'
334 arg = 'keyfile'
335 else:
335 else:
336 arg = 'key'
336 arg = 'key'
337 key_arg = {arg:exec_key}
337 key_arg = {arg:exec_key}
338 if username is None:
338 if username is None:
339 self.session = ss.StreamSession(**key_arg)
339 self.session = ss.StreamSession(**key_arg)
340 else:
340 else:
341 self.session = ss.StreamSession(username, **key_arg)
341 self.session = ss.StreamSession(username, **key_arg)
342 self._registration_socket = self._context.socket(zmq.XREQ)
342 self._query_socket = self._context.socket(zmq.XREQ)
343 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
343 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
344 if self._ssh:
344 if self._ssh:
345 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
345 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
346 else:
346 else:
347 self._registration_socket.connect(url)
347 self._query_socket.connect(url)
348
348
349 self.session.debug = self.debug
349 self.session.debug = self.debug
350
350
351 self._notification_handlers = {'registration_notification' : self._register_engine,
351 self._notification_handlers = {'registration_notification' : self._register_engine,
352 'unregistration_notification' : self._unregister_engine,
352 'unregistration_notification' : self._unregister_engine,
353 }
353 }
354 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
354 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
355 'apply_reply' : self._handle_apply_reply}
355 'apply_reply' : self._handle_apply_reply}
356 self._connect(sshserver, ssh_kwargs)
356 self._connect(sshserver, ssh_kwargs)
357
357
358 def __del__(self):
358 def __del__(self):
359 """cleanup sockets, but _not_ context."""
359 """cleanup sockets, but _not_ context."""
360 self.close()
360 self.close()
361
361
362 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
362 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
363 if ipython_dir is None:
363 if ipython_dir is None:
364 ipython_dir = get_ipython_dir()
364 ipython_dir = get_ipython_dir()
365 if cluster_dir is not None:
365 if cluster_dir is not None:
366 try:
366 try:
367 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
367 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
368 return
368 return
369 except ClusterDirError:
369 except ClusterDirError:
370 pass
370 pass
371 elif profile is not None:
371 elif profile is not None:
372 try:
372 try:
373 self._cd = ClusterDir.find_cluster_dir_by_profile(
373 self._cd = ClusterDir.find_cluster_dir_by_profile(
374 ipython_dir, profile)
374 ipython_dir, profile)
375 return
375 return
376 except ClusterDirError:
376 except ClusterDirError:
377 pass
377 pass
378 self._cd = None
378 self._cd = None
379
379
380 @property
380 @property
381 def ids(self):
381 def ids(self):
382 """Always up-to-date ids property."""
382 """Always up-to-date ids property."""
383 self._flush_notifications()
383 self._flush_notifications()
384 # always copy:
384 # always copy:
385 return list(self._ids)
385 return list(self._ids)
386
386
387 def close(self):
387 def close(self):
388 if self._closed:
388 if self._closed:
389 return
389 return
390 snames = filter(lambda n: n.endswith('socket'), dir(self))
390 snames = filter(lambda n: n.endswith('socket'), dir(self))
391 for socket in map(lambda name: getattr(self, name), snames):
391 for socket in map(lambda name: getattr(self, name), snames):
392 if isinstance(socket, zmq.Socket) and not socket.closed:
392 if isinstance(socket, zmq.Socket) and not socket.closed:
393 socket.close()
393 socket.close()
394 self._closed = True
394 self._closed = True
395
395
396 def _update_engines(self, engines):
396 def _update_engines(self, engines):
397 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
397 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
398 for k,v in engines.iteritems():
398 for k,v in engines.iteritems():
399 eid = int(k)
399 eid = int(k)
400 self._engines[eid] = bytes(v) # force not unicode
400 self._engines[eid] = bytes(v) # force not unicode
401 self._ids.append(eid)
401 self._ids.append(eid)
402 self._ids = sorted(self._ids)
402 self._ids = sorted(self._ids)
403 if sorted(self._engines.keys()) != range(len(self._engines)) and \
403 if sorted(self._engines.keys()) != range(len(self._engines)) and \
404 self._task_scheme == 'pure' and self._task_socket:
404 self._task_scheme == 'pure' and self._task_socket:
405 self._stop_scheduling_tasks()
405 self._stop_scheduling_tasks()
406
406
407 def _stop_scheduling_tasks(self):
407 def _stop_scheduling_tasks(self):
408 """Stop scheduling tasks because an engine has been unregistered
408 """Stop scheduling tasks because an engine has been unregistered
409 from a pure ZMQ scheduler.
409 from a pure ZMQ scheduler.
410 """
410 """
411
411
412 self._task_socket.close()
412 self._task_socket.close()
413 self._task_socket = None
413 self._task_socket = None
414 msg = "An engine has been unregistered, and we are using pure " +\
414 msg = "An engine has been unregistered, and we are using pure " +\
415 "ZMQ task scheduling. Task farming will be disabled."
415 "ZMQ task scheduling. Task farming will be disabled."
416 if self.outstanding:
416 if self.outstanding:
417 msg += " If you were running tasks when this happened, " +\
417 msg += " If you were running tasks when this happened, " +\
418 "some `outstanding` msg_ids may never resolve."
418 "some `outstanding` msg_ids may never resolve."
419 warnings.warn(msg, RuntimeWarning)
419 warnings.warn(msg, RuntimeWarning)
420
420
421 def _build_targets(self, targets):
421 def _build_targets(self, targets):
422 """Turn valid target IDs or 'all' into two lists:
422 """Turn valid target IDs or 'all' into two lists:
423 (int_ids, uuids).
423 (int_ids, uuids).
424 """
424 """
425 if targets is None:
425 if targets is None:
426 targets = self._ids
426 targets = self._ids
427 elif isinstance(targets, str):
427 elif isinstance(targets, str):
428 if targets.lower() == 'all':
428 if targets.lower() == 'all':
429 targets = self._ids
429 targets = self._ids
430 else:
430 else:
431 raise TypeError("%r not valid str target, must be 'all'"%(targets))
431 raise TypeError("%r not valid str target, must be 'all'"%(targets))
432 elif isinstance(targets, int):
432 elif isinstance(targets, int):
433 targets = [targets]
433 targets = [targets]
434 return [self._engines[t] for t in targets], list(targets)
434 return [self._engines[t] for t in targets], list(targets)
435
435
436 def _connect(self, sshserver, ssh_kwargs):
436 def _connect(self, sshserver, ssh_kwargs):
437 """setup all our socket connections to the controller. This is called from
437 """setup all our socket connections to the controller. This is called from
438 __init__."""
438 __init__."""
439
439
440 # Maybe allow reconnecting?
440 # Maybe allow reconnecting?
441 if self._connected:
441 if self._connected:
442 return
442 return
443 self._connected=True
443 self._connected=True
444
444
445 def connect_socket(s, url):
445 def connect_socket(s, url):
446 url = disambiguate_url(url, self._config['location'])
446 url = disambiguate_url(url, self._config['location'])
447 if self._ssh:
447 if self._ssh:
448 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
448 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
449 else:
449 else:
450 return s.connect(url)
450 return s.connect(url)
451
451
452 self.session.send(self._registration_socket, 'connection_request')
452 self.session.send(self._query_socket, 'connection_request')
453 idents,msg = self.session.recv(self._registration_socket,mode=0)
453 idents,msg = self.session.recv(self._query_socket,mode=0)
454 if self.debug:
454 if self.debug:
455 pprint(msg)
455 pprint(msg)
456 msg = ss.Message(msg)
456 msg = ss.Message(msg)
457 content = msg.content
457 content = msg.content
458 self._config['registration'] = dict(content)
458 self._config['registration'] = dict(content)
459 if content.status == 'ok':
459 if content.status == 'ok':
460 if content.mux:
460 if content.mux:
461 self._mux_socket = self._context.socket(zmq.PAIR)
461 self._mux_socket = self._context.socket(zmq.XREQ)
462 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
462 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
463 connect_socket(self._mux_socket, content.mux)
463 connect_socket(self._mux_socket, content.mux)
464 if content.task:
464 if content.task:
465 self._task_scheme, task_addr = content.task
465 self._task_scheme, task_addr = content.task
466 self._task_socket = self._context.socket(zmq.PAIR)
466 self._task_socket = self._context.socket(zmq.XREQ)
467 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
467 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
468 connect_socket(self._task_socket, task_addr)
468 connect_socket(self._task_socket, task_addr)
469 if content.notification:
469 if content.notification:
470 self._notification_socket = self._context.socket(zmq.SUB)
470 self._notification_socket = self._context.socket(zmq.SUB)
471 connect_socket(self._notification_socket, content.notification)
471 connect_socket(self._notification_socket, content.notification)
472 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
472 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
473 if content.query:
473 # if content.query:
474 self._query_socket = self._context.socket(zmq.PAIR)
474 # self._query_socket = self._context.socket(zmq.XREQ)
475 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
475 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
476 connect_socket(self._query_socket, content.query)
476 # connect_socket(self._query_socket, content.query)
477 if content.control:
477 if content.control:
478 self._control_socket = self._context.socket(zmq.PAIR)
478 self._control_socket = self._context.socket(zmq.XREQ)
479 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
479 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
480 connect_socket(self._control_socket, content.control)
480 connect_socket(self._control_socket, content.control)
481 if content.iopub:
481 if content.iopub:
482 self._iopub_socket = self._context.socket(zmq.SUB)
482 self._iopub_socket = self._context.socket(zmq.SUB)
483 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
483 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
484 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
484 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
485 connect_socket(self._iopub_socket, content.iopub)
485 connect_socket(self._iopub_socket, content.iopub)
486 self._update_engines(dict(content.engines))
486 self._update_engines(dict(content.engines))
487
487
488 else:
488 else:
489 self._connected = False
489 self._connected = False
490 raise Exception("Failed to connect!")
490 raise Exception("Failed to connect!")
491
491
492 #--------------------------------------------------------------------------
492 #--------------------------------------------------------------------------
493 # handlers and callbacks for incoming messages
493 # handlers and callbacks for incoming messages
494 #--------------------------------------------------------------------------
494 #--------------------------------------------------------------------------
495
495
496 def _unwrap_exception(self, content):
496 def _unwrap_exception(self, content):
497 """unwrap exception, and remap engineid to int."""
497 """unwrap exception, and remap engineid to int."""
498 e = error.unwrap_exception(content)
498 e = error.unwrap_exception(content)
499 print e.traceback
499 if e.engine_info:
500 if e.engine_info:
500 e_uuid = e.engine_info['engine_uuid']
501 e_uuid = e.engine_info['engine_uuid']
501 eid = self._engines[e_uuid]
502 eid = self._engines[e_uuid]
502 e.engine_info['engine_id'] = eid
503 e.engine_info['engine_id'] = eid
503 return e
504 return e
504
505
505 def _extract_metadata(self, header, parent, content):
506 def _extract_metadata(self, header, parent, content):
506 md = {'msg_id' : parent['msg_id'],
507 md = {'msg_id' : parent['msg_id'],
507 'received' : datetime.now(),
508 'received' : datetime.now(),
508 'engine_uuid' : header.get('engine', None),
509 'engine_uuid' : header.get('engine', None),
509 'follow' : parent.get('follow', []),
510 'follow' : parent.get('follow', []),
510 'after' : parent.get('after', []),
511 'after' : parent.get('after', []),
511 'status' : content['status'],
512 'status' : content['status'],
512 }
513 }
513
514
514 if md['engine_uuid'] is not None:
515 if md['engine_uuid'] is not None:
515 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
516 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
516
517
517 if 'date' in parent:
518 if 'date' in parent:
518 md['submitted'] = datetime.strptime(parent['date'], util.ISO8601)
519 md['submitted'] = datetime.strptime(parent['date'], util.ISO8601)
519 if 'started' in header:
520 if 'started' in header:
520 md['started'] = datetime.strptime(header['started'], util.ISO8601)
521 md['started'] = datetime.strptime(header['started'], util.ISO8601)
521 if 'date' in header:
522 if 'date' in header:
522 md['completed'] = datetime.strptime(header['date'], util.ISO8601)
523 md['completed'] = datetime.strptime(header['date'], util.ISO8601)
523 return md
524 return md
524
525
525 def _register_engine(self, msg):
526 def _register_engine(self, msg):
526 """Register a new engine, and update our connection info."""
527 """Register a new engine, and update our connection info."""
527 content = msg['content']
528 content = msg['content']
528 eid = content['id']
529 eid = content['id']
529 d = {eid : content['queue']}
530 d = {eid : content['queue']}
530 self._update_engines(d)
531 self._update_engines(d)
531
532
532 def _unregister_engine(self, msg):
533 def _unregister_engine(self, msg):
533 """Unregister an engine that has died."""
534 """Unregister an engine that has died."""
534 content = msg['content']
535 content = msg['content']
535 eid = int(content['id'])
536 eid = int(content['id'])
536 if eid in self._ids:
537 if eid in self._ids:
537 self._ids.remove(eid)
538 self._ids.remove(eid)
538 uuid = self._engines.pop(eid)
539 uuid = self._engines.pop(eid)
539
540
540 self._handle_stranded_msgs(eid, uuid)
541 self._handle_stranded_msgs(eid, uuid)
541
542
542 if self._task_socket and self._task_scheme == 'pure':
543 if self._task_socket and self._task_scheme == 'pure':
543 self._stop_scheduling_tasks()
544 self._stop_scheduling_tasks()
544
545
545 def _handle_stranded_msgs(self, eid, uuid):
546 def _handle_stranded_msgs(self, eid, uuid):
546 """Handle messages known to be on an engine when the engine unregisters.
547 """Handle messages known to be on an engine when the engine unregisters.
547
548
548 It is possible that this will fire prematurely - that is, an engine will
549 It is possible that this will fire prematurely - that is, an engine will
549 go down after completing a result, and the client will be notified
550 go down after completing a result, and the client will be notified
550 of the unregistration and later receive the successful result.
551 of the unregistration and later receive the successful result.
551 """
552 """
552
553
553 outstanding = self._outstanding_dict[uuid]
554 outstanding = self._outstanding_dict[uuid]
554
555
555 for msg_id in list(outstanding):
556 for msg_id in list(outstanding):
556 if msg_id in self.results:
557 if msg_id in self.results:
557 # we already
558 # we already
558 continue
559 continue
559 try:
560 try:
560 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
561 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
561 except:
562 except:
562 content = error.wrap_exception()
563 content = error.wrap_exception()
563 # build a fake message:
564 # build a fake message:
564 parent = {}
565 parent = {}
565 header = {}
566 header = {}
566 parent['msg_id'] = msg_id
567 parent['msg_id'] = msg_id
567 header['engine'] = uuid
568 header['engine'] = uuid
568 header['date'] = datetime.now().strftime(util.ISO8601)
569 header['date'] = datetime.now().strftime(util.ISO8601)
569 msg = dict(parent_header=parent, header=header, content=content)
570 msg = dict(parent_header=parent, header=header, content=content)
570 self._handle_apply_reply(msg)
571 self._handle_apply_reply(msg)
571
572
572 def _handle_execute_reply(self, msg):
573 def _handle_execute_reply(self, msg):
573 """Save the reply to an execute_request into our results.
574 """Save the reply to an execute_request into our results.
574
575
575 execute messages are never actually used. apply is used instead.
576 execute messages are never actually used. apply is used instead.
576 """
577 """
577
578
578 parent = msg['parent_header']
579 parent = msg['parent_header']
579 msg_id = parent['msg_id']
580 msg_id = parent['msg_id']
580 if msg_id not in self.outstanding:
581 if msg_id not in self.outstanding:
581 if msg_id in self.history:
582 if msg_id in self.history:
582 print ("got stale result: %s"%msg_id)
583 print ("got stale result: %s"%msg_id)
583 else:
584 else:
584 print ("got unknown result: %s"%msg_id)
585 print ("got unknown result: %s"%msg_id)
585 else:
586 else:
586 self.outstanding.remove(msg_id)
587 self.outstanding.remove(msg_id)
587 self.results[msg_id] = self._unwrap_exception(msg['content'])
588 self.results[msg_id] = self._unwrap_exception(msg['content'])
588
589
589 def _handle_apply_reply(self, msg):
590 def _handle_apply_reply(self, msg):
590 """Save the reply to an apply_request into our results."""
591 """Save the reply to an apply_request into our results."""
591 parent = msg['parent_header']
592 parent = msg['parent_header']
592 msg_id = parent['msg_id']
593 msg_id = parent['msg_id']
593 if msg_id not in self.outstanding:
594 if msg_id not in self.outstanding:
594 if msg_id in self.history:
595 if msg_id in self.history:
595 print ("got stale result: %s"%msg_id)
596 print ("got stale result: %s"%msg_id)
596 print self.results[msg_id]
597 print self.results[msg_id]
597 print msg
598 print msg
598 else:
599 else:
599 print ("got unknown result: %s"%msg_id)
600 print ("got unknown result: %s"%msg_id)
600 else:
601 else:
601 self.outstanding.remove(msg_id)
602 self.outstanding.remove(msg_id)
602 content = msg['content']
603 content = msg['content']
603 header = msg['header']
604 header = msg['header']
604
605
605 # construct metadata:
606 # construct metadata:
606 md = self.metadata[msg_id]
607 md = self.metadata[msg_id]
607 md.update(self._extract_metadata(header, parent, content))
608 md.update(self._extract_metadata(header, parent, content))
608 # is this redundant?
609 # is this redundant?
609 self.metadata[msg_id] = md
610 self.metadata[msg_id] = md
610
611
611 e_outstanding = self._outstanding_dict[md['engine_uuid']]
612 e_outstanding = self._outstanding_dict[md['engine_uuid']]
612 if msg_id in e_outstanding:
613 if msg_id in e_outstanding:
613 e_outstanding.remove(msg_id)
614 e_outstanding.remove(msg_id)
614
615
615 # construct result:
616 # construct result:
616 if content['status'] == 'ok':
617 if content['status'] == 'ok':
617 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
618 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
618 elif content['status'] == 'aborted':
619 elif content['status'] == 'aborted':
619 self.results[msg_id] = error.AbortedTask(msg_id)
620 self.results[msg_id] = error.AbortedTask(msg_id)
620 elif content['status'] == 'resubmitted':
621 elif content['status'] == 'resubmitted':
621 # TODO: handle resubmission
622 # TODO: handle resubmission
622 pass
623 pass
623 else:
624 else:
624 self.results[msg_id] = self._unwrap_exception(content)
625 self.results[msg_id] = self._unwrap_exception(content)
625
626
626 def _flush_notifications(self):
627 def _flush_notifications(self):
627 """Flush notifications of engine registrations waiting
628 """Flush notifications of engine registrations waiting
628 in ZMQ queue."""
629 in ZMQ queue."""
629 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
630 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
630 while msg is not None:
631 while msg is not None:
631 if self.debug:
632 if self.debug:
632 pprint(msg)
633 pprint(msg)
633 msg = msg[-1]
634 msg = msg[-1]
634 msg_type = msg['msg_type']
635 msg_type = msg['msg_type']
635 handler = self._notification_handlers.get(msg_type, None)
636 handler = self._notification_handlers.get(msg_type, None)
636 if handler is None:
637 if handler is None:
637 raise Exception("Unhandled message type: %s"%msg.msg_type)
638 raise Exception("Unhandled message type: %s"%msg.msg_type)
638 else:
639 else:
639 handler(msg)
640 handler(msg)
640 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
641 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
641
642
642 def _flush_results(self, sock):
643 def _flush_results(self, sock):
643 """Flush task or queue results waiting in ZMQ queue."""
644 """Flush task or queue results waiting in ZMQ queue."""
644 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
645 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
645 while msg is not None:
646 while msg is not None:
646 if self.debug:
647 if self.debug:
647 pprint(msg)
648 pprint(msg)
648 msg = msg[-1]
649 msg = msg[-1]
649 msg_type = msg['msg_type']
650 msg_type = msg['msg_type']
650 handler = self._queue_handlers.get(msg_type, None)
651 handler = self._queue_handlers.get(msg_type, None)
651 if handler is None:
652 if handler is None:
652 raise Exception("Unhandled message type: %s"%msg.msg_type)
653 raise Exception("Unhandled message type: %s"%msg.msg_type)
653 else:
654 else:
654 handler(msg)
655 handler(msg)
655 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
656 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
656
657
657 def _flush_control(self, sock):
658 def _flush_control(self, sock):
658 """Flush replies from the control channel waiting
659 """Flush replies from the control channel waiting
659 in the ZMQ queue.
660 in the ZMQ queue.
660
661
661 Currently: ignore them."""
662 Currently: ignore them."""
662 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
663 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
663 while msg is not None:
664 while msg is not None:
664 if self.debug:
665 if self.debug:
665 pprint(msg)
666 pprint(msg)
666 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
667 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
667
668
668 def _flush_iopub(self, sock):
669 def _flush_iopub(self, sock):
669 """Flush replies from the iopub channel waiting
670 """Flush replies from the iopub channel waiting
670 in the ZMQ queue.
671 in the ZMQ queue.
671 """
672 """
672 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
673 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
673 while msg is not None:
674 while msg is not None:
674 if self.debug:
675 if self.debug:
675 pprint(msg)
676 pprint(msg)
676 msg = msg[-1]
677 msg = msg[-1]
677 parent = msg['parent_header']
678 parent = msg['parent_header']
678 msg_id = parent['msg_id']
679 msg_id = parent['msg_id']
679 content = msg['content']
680 content = msg['content']
680 header = msg['header']
681 header = msg['header']
681 msg_type = msg['msg_type']
682 msg_type = msg['msg_type']
682
683
683 # init metadata:
684 # init metadata:
684 md = self.metadata[msg_id]
685 md = self.metadata[msg_id]
685
686
686 if msg_type == 'stream':
687 if msg_type == 'stream':
687 name = content['name']
688 name = content['name']
688 s = md[name] or ''
689 s = md[name] or ''
689 md[name] = s + content['data']
690 md[name] = s + content['data']
690 elif msg_type == 'pyerr':
691 elif msg_type == 'pyerr':
691 md.update({'pyerr' : self._unwrap_exception(content)})
692 md.update({'pyerr' : self._unwrap_exception(content)})
692 else:
693 else:
693 md.update({msg_type : content['data']})
694 md.update({msg_type : content['data']})
694
695
695 # reduntant?
696 # reduntant?
696 self.metadata[msg_id] = md
697 self.metadata[msg_id] = md
697
698
698 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
699 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
699
700
700 #--------------------------------------------------------------------------
701 #--------------------------------------------------------------------------
701 # len, getitem
702 # len, getitem
702 #--------------------------------------------------------------------------
703 #--------------------------------------------------------------------------
703
704
704 def __len__(self):
705 def __len__(self):
705 """len(client) returns # of engines."""
706 """len(client) returns # of engines."""
706 return len(self.ids)
707 return len(self.ids)
707
708
708 def __getitem__(self, key):
709 def __getitem__(self, key):
709 """index access returns DirectView multiplexer objects
710 """index access returns DirectView multiplexer objects
710
711
711 Must be int, slice, or list/tuple/xrange of ints"""
712 Must be int, slice, or list/tuple/xrange of ints"""
712 if not isinstance(key, (int, slice, tuple, list, xrange)):
713 if not isinstance(key, (int, slice, tuple, list, xrange)):
713 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
714 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
714 else:
715 else:
715 return self.view(key, balanced=False)
716 return self.view(key, balanced=False)
716
717
717 #--------------------------------------------------------------------------
718 #--------------------------------------------------------------------------
718 # Begin public methods
719 # Begin public methods
719 #--------------------------------------------------------------------------
720 #--------------------------------------------------------------------------
720
721
721 def spin(self):
722 def spin(self):
722 """Flush any registration notifications and execution results
723 """Flush any registration notifications and execution results
723 waiting in the ZMQ queue.
724 waiting in the ZMQ queue.
724 """
725 """
725 if self._notification_socket:
726 if self._notification_socket:
726 self._flush_notifications()
727 self._flush_notifications()
727 if self._mux_socket:
728 if self._mux_socket:
728 self._flush_results(self._mux_socket)
729 self._flush_results(self._mux_socket)
729 if self._task_socket:
730 if self._task_socket:
730 self._flush_results(self._task_socket)
731 self._flush_results(self._task_socket)
731 if self._control_socket:
732 if self._control_socket:
732 self._flush_control(self._control_socket)
733 self._flush_control(self._control_socket)
733 if self._iopub_socket:
734 if self._iopub_socket:
734 self._flush_iopub(self._iopub_socket)
735 self._flush_iopub(self._iopub_socket)
735
736
736 def barrier(self, jobs=None, timeout=-1):
737 def barrier(self, jobs=None, timeout=-1):
737 """waits on one or more `jobs`, for up to `timeout` seconds.
738 """waits on one or more `jobs`, for up to `timeout` seconds.
738
739
739 Parameters
740 Parameters
740 ----------
741 ----------
741
742
742 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
743 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
743 ints are indices to self.history
744 ints are indices to self.history
744 strs are msg_ids
745 strs are msg_ids
745 default: wait on all outstanding messages
746 default: wait on all outstanding messages
746 timeout : float
747 timeout : float
747 a time in seconds, after which to give up.
748 a time in seconds, after which to give up.
748 default is -1, which means no timeout
749 default is -1, which means no timeout
749
750
750 Returns
751 Returns
751 -------
752 -------
752
753
753 True : when all msg_ids are done
754 True : when all msg_ids are done
754 False : timeout reached, some msg_ids still outstanding
755 False : timeout reached, some msg_ids still outstanding
755 """
756 """
756 tic = time.time()
757 tic = time.time()
757 if jobs is None:
758 if jobs is None:
758 theids = self.outstanding
759 theids = self.outstanding
759 else:
760 else:
760 if isinstance(jobs, (int, str, AsyncResult)):
761 if isinstance(jobs, (int, str, AsyncResult)):
761 jobs = [jobs]
762 jobs = [jobs]
762 theids = set()
763 theids = set()
763 for job in jobs:
764 for job in jobs:
764 if isinstance(job, int):
765 if isinstance(job, int):
765 # index access
766 # index access
766 job = self.history[job]
767 job = self.history[job]
767 elif isinstance(job, AsyncResult):
768 elif isinstance(job, AsyncResult):
768 map(theids.add, job.msg_ids)
769 map(theids.add, job.msg_ids)
769 continue
770 continue
770 theids.add(job)
771 theids.add(job)
771 if not theids.intersection(self.outstanding):
772 if not theids.intersection(self.outstanding):
772 return True
773 return True
773 self.spin()
774 self.spin()
774 while theids.intersection(self.outstanding):
775 while theids.intersection(self.outstanding):
775 if timeout >= 0 and ( time.time()-tic ) > timeout:
776 if timeout >= 0 and ( time.time()-tic ) > timeout:
776 break
777 break
777 time.sleep(1e-3)
778 time.sleep(1e-3)
778 self.spin()
779 self.spin()
779 return len(theids.intersection(self.outstanding)) == 0
780 return len(theids.intersection(self.outstanding)) == 0
780
781
781 #--------------------------------------------------------------------------
782 #--------------------------------------------------------------------------
782 # Control methods
783 # Control methods
783 #--------------------------------------------------------------------------
784 #--------------------------------------------------------------------------
784
785
785 @spinfirst
786 @spinfirst
786 @defaultblock
787 @defaultblock
787 def clear(self, targets=None, block=None):
788 def clear(self, targets=None, block=None):
788 """Clear the namespace in target(s)."""
789 """Clear the namespace in target(s)."""
789 targets = self._build_targets(targets)[0]
790 targets = self._build_targets(targets)[0]
790 for t in targets:
791 for t in targets:
791 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
792 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
792 error = False
793 error = False
793 if self.block:
794 if self.block:
794 for i in range(len(targets)):
795 for i in range(len(targets)):
795 idents,msg = self.session.recv(self._control_socket,0)
796 idents,msg = self.session.recv(self._control_socket,0)
796 if self.debug:
797 if self.debug:
797 pprint(msg)
798 pprint(msg)
798 if msg['content']['status'] != 'ok':
799 if msg['content']['status'] != 'ok':
799 error = self._unwrap_exception(msg['content'])
800 error = self._unwrap_exception(msg['content'])
800 if error:
801 if error:
801 raise error
802 raise error
802
803
803
804
804 @spinfirst
805 @spinfirst
805 @defaultblock
806 @defaultblock
806 def abort(self, jobs=None, targets=None, block=None):
807 def abort(self, jobs=None, targets=None, block=None):
807 """Abort specific jobs from the execution queues of target(s).
808 """Abort specific jobs from the execution queues of target(s).
808
809
809 This is a mechanism to prevent jobs that have already been submitted
810 This is a mechanism to prevent jobs that have already been submitted
810 from executing.
811 from executing.
811
812
812 Parameters
813 Parameters
813 ----------
814 ----------
814
815
815 jobs : msg_id, list of msg_ids, or AsyncResult
816 jobs : msg_id, list of msg_ids, or AsyncResult
816 The jobs to be aborted
817 The jobs to be aborted
817
818
818
819
819 """
820 """
820 targets = self._build_targets(targets)[0]
821 targets = self._build_targets(targets)[0]
821 msg_ids = []
822 msg_ids = []
822 if isinstance(jobs, (basestring,AsyncResult)):
823 if isinstance(jobs, (basestring,AsyncResult)):
823 jobs = [jobs]
824 jobs = [jobs]
824 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
825 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
825 if bad_ids:
826 if bad_ids:
826 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
827 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
827 for j in jobs:
828 for j in jobs:
828 if isinstance(j, AsyncResult):
829 if isinstance(j, AsyncResult):
829 msg_ids.extend(j.msg_ids)
830 msg_ids.extend(j.msg_ids)
830 else:
831 else:
831 msg_ids.append(j)
832 msg_ids.append(j)
832 content = dict(msg_ids=msg_ids)
833 content = dict(msg_ids=msg_ids)
833 for t in targets:
834 for t in targets:
834 self.session.send(self._control_socket, 'abort_request',
835 self.session.send(self._control_socket, 'abort_request',
835 content=content, ident=t)
836 content=content, ident=t)
836 error = False
837 error = False
837 if self.block:
838 if self.block:
838 for i in range(len(targets)):
839 for i in range(len(targets)):
839 idents,msg = self.session.recv(self._control_socket,0)
840 idents,msg = self.session.recv(self._control_socket,0)
840 if self.debug:
841 if self.debug:
841 pprint(msg)
842 pprint(msg)
842 if msg['content']['status'] != 'ok':
843 if msg['content']['status'] != 'ok':
843 error = self._unwrap_exception(msg['content'])
844 error = self._unwrap_exception(msg['content'])
844 if error:
845 if error:
845 raise error
846 raise error
846
847
847 @spinfirst
848 @spinfirst
848 @defaultblock
849 @defaultblock
849 def shutdown(self, targets=None, restart=False, controller=False, block=None):
850 def shutdown(self, targets=None, restart=False, controller=False, block=None):
850 """Terminates one or more engine processes, optionally including the controller."""
851 """Terminates one or more engine processes, optionally including the controller."""
851 if controller:
852 if controller:
852 targets = 'all'
853 targets = 'all'
853 targets = self._build_targets(targets)[0]
854 targets = self._build_targets(targets)[0]
854 for t in targets:
855 for t in targets:
855 self.session.send(self._control_socket, 'shutdown_request',
856 self.session.send(self._control_socket, 'shutdown_request',
856 content={'restart':restart},ident=t)
857 content={'restart':restart},ident=t)
857 error = False
858 error = False
858 if block or controller:
859 if block or controller:
859 for i in range(len(targets)):
860 for i in range(len(targets)):
860 idents,msg = self.session.recv(self._control_socket,0)
861 idents,msg = self.session.recv(self._control_socket,0)
861 if self.debug:
862 if self.debug:
862 pprint(msg)
863 pprint(msg)
863 if msg['content']['status'] != 'ok':
864 if msg['content']['status'] != 'ok':
864 error = self._unwrap_exception(msg['content'])
865 error = self._unwrap_exception(msg['content'])
865
866
866 if controller:
867 if controller:
867 time.sleep(0.25)
868 time.sleep(0.25)
868 self.session.send(self._query_socket, 'shutdown_request')
869 self.session.send(self._query_socket, 'shutdown_request')
869 idents,msg = self.session.recv(self._query_socket, 0)
870 idents,msg = self.session.recv(self._query_socket, 0)
870 if self.debug:
871 if self.debug:
871 pprint(msg)
872 pprint(msg)
872 if msg['content']['status'] != 'ok':
873 if msg['content']['status'] != 'ok':
873 error = self._unwrap_exception(msg['content'])
874 error = self._unwrap_exception(msg['content'])
874
875
875 if error:
876 if error:
876 raise error
877 raise error
877
878
878 #--------------------------------------------------------------------------
879 #--------------------------------------------------------------------------
879 # Execution methods
880 # Execution methods
880 #--------------------------------------------------------------------------
881 #--------------------------------------------------------------------------
881
882
882 @defaultblock
883 @defaultblock
883 def execute(self, code, targets='all', block=None):
884 def execute(self, code, targets='all', block=None):
884 """Executes `code` on `targets` in blocking or nonblocking manner.
885 """Executes `code` on `targets` in blocking or nonblocking manner.
885
886
886 ``execute`` is always `bound` (affects engine namespace)
887 ``execute`` is always `bound` (affects engine namespace)
887
888
888 Parameters
889 Parameters
889 ----------
890 ----------
890
891
891 code : str
892 code : str
892 the code string to be executed
893 the code string to be executed
893 targets : int/str/list of ints/strs
894 targets : int/str/list of ints/strs
894 the engines on which to execute
895 the engines on which to execute
895 default : all
896 default : all
896 block : bool
897 block : bool
897 whether or not to wait until done to return
898 whether or not to wait until done to return
898 default: self.block
899 default: self.block
899 """
900 """
900 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False)
901 result = self.apply(_execute, (code,), targets=targets, block=block, bound=True, balanced=False)
901 if not block:
902 if not block:
902 return result
903 return result
903
904
904 def run(self, filename, targets='all', block=None):
905 def run(self, filename, targets='all', block=None):
905 """Execute contents of `filename` on engine(s).
906 """Execute contents of `filename` on engine(s).
906
907
907 This simply reads the contents of the file and calls `execute`.
908 This simply reads the contents of the file and calls `execute`.
908
909
909 Parameters
910 Parameters
910 ----------
911 ----------
911
912
912 filename : str
913 filename : str
913 The path to the file
914 The path to the file
914 targets : int/str/list of ints/strs
915 targets : int/str/list of ints/strs
915 the engines on which to execute
916 the engines on which to execute
916 default : all
917 default : all
917 block : bool
918 block : bool
918 whether or not to wait until done
919 whether or not to wait until done
919 default: self.block
920 default: self.block
920
921
921 """
922 """
922 with open(filename, 'r') as f:
923 with open(filename, 'r') as f:
923 # add newline in case of trailing indented whitespace
924 # add newline in case of trailing indented whitespace
924 # which will cause SyntaxError
925 # which will cause SyntaxError
925 code = f.read()+'\n'
926 code = f.read()+'\n'
926 return self.execute(code, targets=targets, block=block)
927 return self.execute(code, targets=targets, block=block)
927
928
928 def _maybe_raise(self, result):
929 def _maybe_raise(self, result):
929 """wrapper for maybe raising an exception if apply failed."""
930 """wrapper for maybe raising an exception if apply failed."""
930 if isinstance(result, error.RemoteError):
931 if isinstance(result, error.RemoteError):
931 raise result
932 raise result
932
933
933 return result
934 return result
934
935
935 def _build_dependency(self, dep):
936 def _build_dependency(self, dep):
936 """helper for building jsonable dependencies from various input forms"""
937 """helper for building jsonable dependencies from various input forms"""
937 if isinstance(dep, Dependency):
938 if isinstance(dep, Dependency):
938 return dep.as_dict()
939 return dep.as_dict()
939 elif isinstance(dep, AsyncResult):
940 elif isinstance(dep, AsyncResult):
940 return dep.msg_ids
941 return dep.msg_ids
941 elif dep is None:
942 elif dep is None:
942 return []
943 return []
943 else:
944 else:
944 # pass to Dependency constructor
945 # pass to Dependency constructor
945 return list(Dependency(dep))
946 return list(Dependency(dep))
946
947
947 @defaultblock
948 @defaultblock
948 def apply(self, f, args=None, kwargs=None, bound=False, block=None,
949 def apply(self, f, args=None, kwargs=None, bound=False, block=None,
949 targets=None, balanced=None,
950 targets=None, balanced=None,
950 after=None, follow=None, timeout=None,
951 after=None, follow=None, timeout=None,
951 track=False):
952 track=False):
952 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
953 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
953
954
954 This is the central execution command for the client.
955 This is the central execution command for the client.
955
956
956 Parameters
957 Parameters
957 ----------
958 ----------
958
959
959 f : function
960 f : function
960 The fuction to be called remotely
961 The fuction to be called remotely
961 args : tuple/list
962 args : tuple/list
962 The positional arguments passed to `f`
963 The positional arguments passed to `f`
963 kwargs : dict
964 kwargs : dict
964 The keyword arguments passed to `f`
965 The keyword arguments passed to `f`
965 bound : bool (default: False)
966 bound : bool (default: False)
966 Whether to pass the Engine(s) Namespace as the first argument to `f`.
967 Whether to pass the Engine(s) Namespace as the first argument to `f`.
967 block : bool (default: self.block)
968 block : bool (default: self.block)
968 Whether to wait for the result, or return immediately.
969 Whether to wait for the result, or return immediately.
969 False:
970 False:
970 returns AsyncResult
971 returns AsyncResult
971 True:
972 True:
972 returns actual result(s) of f(*args, **kwargs)
973 returns actual result(s) of f(*args, **kwargs)
973 if multiple targets:
974 if multiple targets:
974 list of results, matching `targets`
975 list of results, matching `targets`
975 targets : int,list of ints, 'all', None
976 targets : int,list of ints, 'all', None
976 Specify the destination of the job.
977 Specify the destination of the job.
977 if None:
978 if None:
978 Submit via Task queue for load-balancing.
979 Submit via Task queue for load-balancing.
979 if 'all':
980 if 'all':
980 Run on all active engines
981 Run on all active engines
981 if list:
982 if list:
982 Run on each specified engine
983 Run on each specified engine
983 if int:
984 if int:
984 Run on single engine
985 Run on single engine
985
986
986 balanced : bool, default None
987 balanced : bool, default None
987 whether to load-balance. This will default to True
988 whether to load-balance. This will default to True
988 if targets is unspecified, or False if targets is specified.
989 if targets is unspecified, or False if targets is specified.
989
990
990 The following arguments are only used when balanced is True:
991 The following arguments are only used when balanced is True:
991 after : Dependency or collection of msg_ids
992 after : Dependency or collection of msg_ids
992 Only for load-balanced execution (targets=None)
993 Only for load-balanced execution (targets=None)
993 Specify a list of msg_ids as a time-based dependency.
994 Specify a list of msg_ids as a time-based dependency.
994 This job will only be run *after* the dependencies
995 This job will only be run *after* the dependencies
995 have been met.
996 have been met.
996
997
997 follow : Dependency or collection of msg_ids
998 follow : Dependency or collection of msg_ids
998 Only for load-balanced execution (targets=None)
999 Only for load-balanced execution (targets=None)
999 Specify a list of msg_ids as a location-based dependency.
1000 Specify a list of msg_ids as a location-based dependency.
1000 This job will only be run on an engine where this dependency
1001 This job will only be run on an engine where this dependency
1001 is met.
1002 is met.
1002
1003
1003 timeout : float/int or None
1004 timeout : float/int or None
1004 Only for load-balanced execution (targets=None)
1005 Only for load-balanced execution (targets=None)
1005 Specify an amount of time (in seconds) for the scheduler to
1006 Specify an amount of time (in seconds) for the scheduler to
1006 wait for dependencies to be met before failing with a
1007 wait for dependencies to be met before failing with a
1007 DependencyTimeout.
1008 DependencyTimeout.
1008 track : bool
1009 track : bool
1009 whether to track non-copying sends.
1010 whether to track non-copying sends.
1010 [default False]
1011 [default False]
1011
1012
1012 after,follow,timeout only used if `balanced=True`.
1013 after,follow,timeout only used if `balanced=True`.
1013
1014
1014 Returns
1015 Returns
1015 -------
1016 -------
1016
1017
1017 if block is False:
1018 if block is False:
1018 return AsyncResult wrapping msg_ids
1019 return AsyncResult wrapping msg_ids
1019 output of AsyncResult.get() is identical to that of `apply(...block=True)`
1020 output of AsyncResult.get() is identical to that of `apply(...block=True)`
1020 else:
1021 else:
1021 if single target:
1022 if single target:
1022 return result of `f(*args, **kwargs)`
1023 return result of `f(*args, **kwargs)`
1023 else:
1024 else:
1024 return list of results, matching `targets`
1025 return list of results, matching `targets`
1025 """
1026 """
1026 assert not self._closed, "cannot use me anymore, I'm closed!"
1027 assert not self._closed, "cannot use me anymore, I'm closed!"
1027 # defaults:
1028 # defaults:
1028 block = block if block is not None else self.block
1029 block = block if block is not None else self.block
1029 args = args if args is not None else []
1030 args = args if args is not None else []
1030 kwargs = kwargs if kwargs is not None else {}
1031 kwargs = kwargs if kwargs is not None else {}
1031
1032
1032 if balanced is None:
1033 if balanced is None:
1033 if targets is None:
1034 if targets is None:
1034 # default to balanced if targets unspecified
1035 # default to balanced if targets unspecified
1035 balanced = True
1036 balanced = True
1036 else:
1037 else:
1037 # otherwise default to multiplexing
1038 # otherwise default to multiplexing
1038 balanced = False
1039 balanced = False
1039
1040
1040 if targets is None and balanced is False:
1041 if targets is None and balanced is False:
1041 # default to all if *not* balanced, and targets is unspecified
1042 # default to all if *not* balanced, and targets is unspecified
1042 targets = 'all'
1043 targets = 'all'
1043
1044
1044 # enforce types of f,args,kwrags
1045 # enforce types of f,args,kwrags
1045 if not callable(f):
1046 if not callable(f):
1046 raise TypeError("f must be callable, not %s"%type(f))
1047 raise TypeError("f must be callable, not %s"%type(f))
1047 if not isinstance(args, (tuple, list)):
1048 if not isinstance(args, (tuple, list)):
1048 raise TypeError("args must be tuple or list, not %s"%type(args))
1049 raise TypeError("args must be tuple or list, not %s"%type(args))
1049 if not isinstance(kwargs, dict):
1050 if not isinstance(kwargs, dict):
1050 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1051 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
1051
1052
1052 options = dict(bound=bound, block=block, targets=targets, track=track)
1053 options = dict(bound=bound, block=block, targets=targets, track=track)
1053
1054
1054 if balanced:
1055 if balanced:
1055 return self._apply_balanced(f, args, kwargs, timeout=timeout,
1056 return self._apply_balanced(f, args, kwargs, timeout=timeout,
1056 after=after, follow=follow, **options)
1057 after=after, follow=follow, **options)
1057 elif follow or after or timeout:
1058 elif follow or after or timeout:
1058 msg = "follow, after, and timeout args are only used for"
1059 msg = "follow, after, and timeout args are only used for"
1059 msg += " load-balanced execution."
1060 msg += " load-balanced execution."
1060 raise ValueError(msg)
1061 raise ValueError(msg)
1061 else:
1062 else:
1062 return self._apply_direct(f, args, kwargs, **options)
1063 return self._apply_direct(f, args, kwargs, **options)
1063
1064
1064 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
1065 def _apply_balanced(self, f, args, kwargs, bound=None, block=None, targets=None,
1065 after=None, follow=None, timeout=None, track=None):
1066 after=None, follow=None, timeout=None, track=None):
1066 """call f(*args, **kwargs) remotely in a load-balanced manner.
1067 """call f(*args, **kwargs) remotely in a load-balanced manner.
1067
1068
1068 This is a private method, see `apply` for details.
1069 This is a private method, see `apply` for details.
1069 Not to be called directly!
1070 Not to be called directly!
1070 """
1071 """
1071
1072
1072 loc = locals()
1073 loc = locals()
1073 for name in ('bound', 'block', 'track'):
1074 for name in ('bound', 'block', 'track'):
1074 assert loc[name] is not None, "kwarg %r must be specified!"%name
1075 assert loc[name] is not None, "kwarg %r must be specified!"%name
1075
1076
1076 if self._task_socket is None:
1077 if self._task_socket is None:
1077 msg = "Task farming is disabled"
1078 msg = "Task farming is disabled"
1078 if self._task_scheme == 'pure':
1079 if self._task_scheme == 'pure':
1079 msg += " because the pure ZMQ scheduler cannot handle"
1080 msg += " because the pure ZMQ scheduler cannot handle"
1080 msg += " disappearing engines."
1081 msg += " disappearing engines."
1081 raise RuntimeError(msg)
1082 raise RuntimeError(msg)
1082
1083
1083 if self._task_scheme == 'pure':
1084 if self._task_scheme == 'pure':
1084 # pure zmq scheme doesn't support dependencies
1085 # pure zmq scheme doesn't support dependencies
1085 msg = "Pure ZMQ scheduler doesn't support dependencies"
1086 msg = "Pure ZMQ scheduler doesn't support dependencies"
1086 if (follow or after):
1087 if (follow or after):
1087 # hard fail on DAG dependencies
1088 # hard fail on DAG dependencies
1088 raise RuntimeError(msg)
1089 raise RuntimeError(msg)
1089 if isinstance(f, dependent):
1090 if isinstance(f, dependent):
1090 # soft warn on functional dependencies
1091 # soft warn on functional dependencies
1091 warnings.warn(msg, RuntimeWarning)
1092 warnings.warn(msg, RuntimeWarning)
1092
1093
1093 # defaults:
1094 # defaults:
1094 args = args if args is not None else []
1095 args = args if args is not None else []
1095 kwargs = kwargs if kwargs is not None else {}
1096 kwargs = kwargs if kwargs is not None else {}
1096
1097
1097 if targets:
1098 if targets:
1098 idents,_ = self._build_targets(targets)
1099 idents,_ = self._build_targets(targets)
1099 else:
1100 else:
1100 idents = []
1101 idents = []
1101
1102
1102 after = self._build_dependency(after)
1103 after = self._build_dependency(after)
1103 follow = self._build_dependency(follow)
1104 follow = self._build_dependency(follow)
1104 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
1105 subheader = dict(after=after, follow=follow, timeout=timeout, targets=idents)
1105 bufs = util.pack_apply_message(f,args,kwargs)
1106 bufs = util.pack_apply_message(f,args,kwargs)
1106 content = dict(bound=bound)
1107 content = dict(bound=bound)
1107
1108
1108 msg = self.session.send(self._task_socket, "apply_request",
1109 msg = self.session.send(self._task_socket, "apply_request",
1109 content=content, buffers=bufs, subheader=subheader, track=track)
1110 content=content, buffers=bufs, subheader=subheader, track=track)
1110 msg_id = msg['msg_id']
1111 msg_id = msg['msg_id']
1111 self.outstanding.add(msg_id)
1112 self.outstanding.add(msg_id)
1112 self.history.append(msg_id)
1113 self.history.append(msg_id)
1113 self.metadata[msg_id]['submitted'] = datetime.now()
1114 self.metadata[msg_id]['submitted'] = datetime.now()
1114 tracker = None if track is False else msg['tracker']
1115 tracker = None if track is False else msg['tracker']
1115 ar = AsyncResult(self, [msg_id], fname=f.__name__, targets=targets, tracker=tracker)
1116 ar = AsyncResult(self, [msg_id], fname=f.__name__, targets=targets, tracker=tracker)
1116 if block:
1117 if block:
1117 try:
1118 try:
1118 return ar.get()
1119 return ar.get()
1119 except KeyboardInterrupt:
1120 except KeyboardInterrupt:
1120 return ar
1121 return ar
1121 else:
1122 else:
1122 return ar
1123 return ar
1123
1124
1124 def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None,
1125 def _apply_direct(self, f, args, kwargs, bound=None, block=None, targets=None,
1125 track=None):
1126 track=None):
1126 """Then underlying method for applying functions to specific engines
1127 """Then underlying method for applying functions to specific engines
1127 via the MUX queue.
1128 via the MUX queue.
1128
1129
1129 This is a private method, see `apply` for details.
1130 This is a private method, see `apply` for details.
1130 Not to be called directly!
1131 Not to be called directly!
1131 """
1132 """
1132 loc = locals()
1133 loc = locals()
1133 for name in ('bound', 'block', 'targets', 'track'):
1134 for name in ('bound', 'block', 'targets', 'track'):
1134 assert loc[name] is not None, "kwarg %r must be specified!"%name
1135 assert loc[name] is not None, "kwarg %r must be specified!"%name
1135
1136
1136 idents,targets = self._build_targets(targets)
1137 idents,targets = self._build_targets(targets)
1137
1138
1138 subheader = {}
1139 subheader = {}
1139 content = dict(bound=bound)
1140 content = dict(bound=bound)
1140 bufs = util.pack_apply_message(f,args,kwargs)
1141 bufs = util.pack_apply_message(f,args,kwargs)
1141
1142
1142 msg_ids = []
1143 msg_ids = []
1143 trackers = []
1144 trackers = []
1144 for ident in idents:
1145 for ident in idents:
1145 msg = self.session.send(self._mux_socket, "apply_request",
1146 msg = self.session.send(self._mux_socket, "apply_request",
1146 content=content, buffers=bufs, ident=ident, subheader=subheader,
1147 content=content, buffers=bufs, ident=ident, subheader=subheader,
1147 track=track)
1148 track=track)
1148 if track:
1149 if track:
1149 trackers.append(msg['tracker'])
1150 trackers.append(msg['tracker'])
1150 msg_id = msg['msg_id']
1151 msg_id = msg['msg_id']
1151 self.outstanding.add(msg_id)
1152 self.outstanding.add(msg_id)
1152 self._outstanding_dict[ident].add(msg_id)
1153 self._outstanding_dict[ident].add(msg_id)
1153 self.history.append(msg_id)
1154 self.history.append(msg_id)
1154 msg_ids.append(msg_id)
1155 msg_ids.append(msg_id)
1155
1156
1156 tracker = None if track is False else zmq.MessageTracker(*trackers)
1157 tracker = None if track is False else zmq.MessageTracker(*trackers)
1157 ar = AsyncResult(self, msg_ids, fname=f.__name__, targets=targets, tracker=tracker)
1158 ar = AsyncResult(self, msg_ids, fname=f.__name__, targets=targets, tracker=tracker)
1158
1159
1159 if block:
1160 if block:
1160 try:
1161 try:
1161 return ar.get()
1162 return ar.get()
1162 except KeyboardInterrupt:
1163 except KeyboardInterrupt:
1163 return ar
1164 return ar
1164 else:
1165 else:
1165 return ar
1166 return ar
1166
1167
1167 #--------------------------------------------------------------------------
1168 #--------------------------------------------------------------------------
1168 # construct a View object
1169 # construct a View object
1169 #--------------------------------------------------------------------------
1170 #--------------------------------------------------------------------------
1170
1171
1171 @defaultblock
1172 @defaultblock
1172 def remote(self, bound=False, block=None, targets=None, balanced=None):
1173 def remote(self, bound=False, block=None, targets=None, balanced=None):
1173 """Decorator for making a RemoteFunction"""
1174 """Decorator for making a RemoteFunction"""
1174 return remote(self, bound=bound, targets=targets, block=block, balanced=balanced)
1175 return remote(self, bound=bound, targets=targets, block=block, balanced=balanced)
1175
1176
1176 @defaultblock
1177 @defaultblock
1177 def parallel(self, dist='b', bound=False, block=None, targets=None, balanced=None):
1178 def parallel(self, dist='b', bound=False, block=None, targets=None, balanced=None):
1178 """Decorator for making a ParallelFunction"""
1179 """Decorator for making a ParallelFunction"""
1179 return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced)
1180 return parallel(self, bound=bound, targets=targets, block=block, balanced=balanced)
1180
1181
1181 def _cache_view(self, targets, balanced):
1182 def _cache_view(self, targets, balanced):
1182 """save views, so subsequent requests don't create new objects."""
1183 """save views, so subsequent requests don't create new objects."""
1183 if balanced:
1184 if balanced:
1184 view_class = LoadBalancedView
1185 view_class = LoadBalancedView
1185 view_cache = self._balanced_views
1186 view_cache = self._balanced_views
1186 else:
1187 else:
1187 view_class = DirectView
1188 view_class = DirectView
1188 view_cache = self._direct_views
1189 view_cache = self._direct_views
1189
1190
1190 # use str, since often targets will be a list
1191 # use str, since often targets will be a list
1191 key = str(targets)
1192 key = str(targets)
1192 if key not in view_cache:
1193 if key not in view_cache:
1193 view_cache[key] = view_class(client=self, targets=targets)
1194 view_cache[key] = view_class(client=self, targets=targets)
1194
1195
1195 return view_cache[key]
1196 return view_cache[key]
1196
1197
1197 def view(self, targets=None, balanced=None):
1198 def view(self, targets=None, balanced=None):
1198 """Method for constructing View objects.
1199 """Method for constructing View objects.
1199
1200
1200 If no arguments are specified, create a LoadBalancedView
1201 If no arguments are specified, create a LoadBalancedView
1201 using all engines. If only `targets` specified, it will
1202 using all engines. If only `targets` specified, it will
1202 be a DirectView. This method is the underlying implementation
1203 be a DirectView. This method is the underlying implementation
1203 of ``client.__getitem__``.
1204 of ``client.__getitem__``.
1204
1205
1205 Parameters
1206 Parameters
1206 ----------
1207 ----------
1207
1208
1208 targets: list,slice,int,etc. [default: use all engines]
1209 targets: list,slice,int,etc. [default: use all engines]
1209 The engines to use for the View
1210 The engines to use for the View
1210 balanced : bool [default: False if targets specified, True else]
1211 balanced : bool [default: False if targets specified, True else]
1211 whether to build a LoadBalancedView or a DirectView
1212 whether to build a LoadBalancedView or a DirectView
1212
1213
1213 """
1214 """
1214
1215
1215 balanced = (targets is None) if balanced is None else balanced
1216 balanced = (targets is None) if balanced is None else balanced
1216
1217
1217 if targets is None:
1218 if targets is None:
1218 if balanced:
1219 if balanced:
1219 return self._cache_view(None,True)
1220 return self._cache_view(None,True)
1220 else:
1221 else:
1221 targets = slice(None)
1222 targets = slice(None)
1222
1223
1223 if isinstance(targets, int):
1224 if isinstance(targets, int):
1224 if targets < 0:
1225 if targets < 0:
1225 targets = self.ids[targets]
1226 targets = self.ids[targets]
1226 if targets not in self.ids:
1227 if targets not in self.ids:
1227 raise IndexError("No such engine: %i"%targets)
1228 raise IndexError("No such engine: %i"%targets)
1228 return self._cache_view(targets, balanced)
1229 return self._cache_view(targets, balanced)
1229
1230
1230 if isinstance(targets, slice):
1231 if isinstance(targets, slice):
1231 indices = range(len(self.ids))[targets]
1232 indices = range(len(self.ids))[targets]
1232 ids = sorted(self._ids)
1233 ids = sorted(self._ids)
1233 targets = [ ids[i] for i in indices ]
1234 targets = [ ids[i] for i in indices ]
1234
1235
1235 if isinstance(targets, (tuple, list, xrange)):
1236 if isinstance(targets, (tuple, list, xrange)):
1236 _,targets = self._build_targets(list(targets))
1237 _,targets = self._build_targets(list(targets))
1237 return self._cache_view(targets, balanced)
1238 return self._cache_view(targets, balanced)
1238 else:
1239 else:
1239 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1240 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
1240
1241
1241 #--------------------------------------------------------------------------
1242 #--------------------------------------------------------------------------
1242 # Data movement
1243 # Data movement
1243 #--------------------------------------------------------------------------
1244 #--------------------------------------------------------------------------
1244
1245
1245 @defaultblock
1246 @defaultblock
1246 def push(self, ns, targets='all', block=None, track=False):
1247 def push(self, ns, targets='all', block=None, track=False):
1247 """Push the contents of `ns` into the namespace on `target`"""
1248 """Push the contents of `ns` into the namespace on `target`"""
1248 if not isinstance(ns, dict):
1249 if not isinstance(ns, dict):
1249 raise TypeError("Must be a dict, not %s"%type(ns))
1250 raise TypeError("Must be a dict, not %s"%type(ns))
1250 result = self.apply(_push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track)
1251 result = self.apply(_push, kwargs=ns, targets=targets, block=block, bound=True, balanced=False, track=track)
1251 if not block:
1252 if not block:
1252 return result
1253 return result
1253
1254
1254 @defaultblock
1255 @defaultblock
1255 def pull(self, keys, targets='all', block=None):
1256 def pull(self, keys, targets='all', block=None):
1256 """Pull objects from `target`'s namespace by `keys`"""
1257 """Pull objects from `target`'s namespace by `keys`"""
1257 if isinstance(keys, basestring):
1258 if isinstance(keys, basestring):
1258 pass
1259 pass
1259 elif isinstance(keys, (list,tuple,set)):
1260 elif isinstance(keys, (list,tuple,set)):
1260 for key in keys:
1261 for key in keys:
1261 if not isinstance(key, basestring):
1262 if not isinstance(key, basestring):
1262 raise TypeError("keys must be str, not type %r"%type(key))
1263 raise TypeError("keys must be str, not type %r"%type(key))
1263 else:
1264 else:
1264 raise TypeError("keys must be strs, not %r"%keys)
1265 raise TypeError("keys must be strs, not %r"%keys)
1265 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1266 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True, balanced=False)
1266 return result
1267 return result
1267
1268
1268 @defaultblock
1269 @defaultblock
1269 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None, track=False):
1270 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None, track=False):
1270 """
1271 """
1271 Partition a Python sequence and send the partitions to a set of engines.
1272 Partition a Python sequence and send the partitions to a set of engines.
1272 """
1273 """
1273 targets = self._build_targets(targets)[-1]
1274 targets = self._build_targets(targets)[-1]
1274 mapObject = Map.dists[dist]()
1275 mapObject = Map.dists[dist]()
1275 nparts = len(targets)
1276 nparts = len(targets)
1276 msg_ids = []
1277 msg_ids = []
1277 trackers = []
1278 trackers = []
1278 for index, engineid in enumerate(targets):
1279 for index, engineid in enumerate(targets):
1279 partition = mapObject.getPartition(seq, index, nparts)
1280 partition = mapObject.getPartition(seq, index, nparts)
1280 if flatten and len(partition) == 1:
1281 if flatten and len(partition) == 1:
1281 r = self.push({key: partition[0]}, targets=engineid, block=False, track=track)
1282 r = self.push({key: partition[0]}, targets=engineid, block=False, track=track)
1282 else:
1283 else:
1283 r = self.push({key: partition}, targets=engineid, block=False, track=track)
1284 r = self.push({key: partition}, targets=engineid, block=False, track=track)
1284 msg_ids.extend(r.msg_ids)
1285 msg_ids.extend(r.msg_ids)
1285 if track:
1286 if track:
1286 trackers.append(r._tracker)
1287 trackers.append(r._tracker)
1287
1288
1288 if track:
1289 if track:
1289 tracker = zmq.MessageTracker(*trackers)
1290 tracker = zmq.MessageTracker(*trackers)
1290 else:
1291 else:
1291 tracker = None
1292 tracker = None
1292
1293
1293 r = AsyncResult(self, msg_ids, fname='scatter', targets=targets, tracker=tracker)
1294 r = AsyncResult(self, msg_ids, fname='scatter', targets=targets, tracker=tracker)
1294 if block:
1295 if block:
1295 r.wait()
1296 r.wait()
1296 else:
1297 else:
1297 return r
1298 return r
1298
1299
1299 @defaultblock
1300 @defaultblock
1300 def gather(self, key, dist='b', targets='all', block=None):
1301 def gather(self, key, dist='b', targets='all', block=None):
1301 """
1302 """
1302 Gather a partitioned sequence on a set of engines as a single local seq.
1303 Gather a partitioned sequence on a set of engines as a single local seq.
1303 """
1304 """
1304
1305
1305 targets = self._build_targets(targets)[-1]
1306 targets = self._build_targets(targets)[-1]
1306 mapObject = Map.dists[dist]()
1307 mapObject = Map.dists[dist]()
1307 msg_ids = []
1308 msg_ids = []
1308 for index, engineid in enumerate(targets):
1309 for index, engineid in enumerate(targets):
1309 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1310 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1310
1311
1311 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1312 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1312 if block:
1313 if block:
1313 return r.get()
1314 return r.get()
1314 else:
1315 else:
1315 return r
1316 return r
1316
1317
1317 #--------------------------------------------------------------------------
1318 #--------------------------------------------------------------------------
1318 # Query methods
1319 # Query methods
1319 #--------------------------------------------------------------------------
1320 #--------------------------------------------------------------------------
1320
1321
1321 @spinfirst
1322 @spinfirst
1322 @defaultblock
1323 @defaultblock
1323 def get_result(self, indices_or_msg_ids=None, block=None):
1324 def get_result(self, indices_or_msg_ids=None, block=None):
1324 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1325 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
1325
1326
1326 If the client already has the results, no request to the Hub will be made.
1327 If the client already has the results, no request to the Hub will be made.
1327
1328
1328 This is a convenient way to construct AsyncResult objects, which are wrappers
1329 This is a convenient way to construct AsyncResult objects, which are wrappers
1329 that include metadata about execution, and allow for awaiting results that
1330 that include metadata about execution, and allow for awaiting results that
1330 were not submitted by this Client.
1331 were not submitted by this Client.
1331
1332
1332 It can also be a convenient way to retrieve the metadata associated with
1333 It can also be a convenient way to retrieve the metadata associated with
1333 blocking execution, since it always retrieves
1334 blocking execution, since it always retrieves
1334
1335
1335 Examples
1336 Examples
1336 --------
1337 --------
1337 ::
1338 ::
1338
1339
1339 In [10]: r = client.apply()
1340 In [10]: r = client.apply()
1340
1341
1341 Parameters
1342 Parameters
1342 ----------
1343 ----------
1343
1344
1344 indices_or_msg_ids : integer history index, str msg_id, or list of either
1345 indices_or_msg_ids : integer history index, str msg_id, or list of either
1345 The indices or msg_ids of indices to be retrieved
1346 The indices or msg_ids of indices to be retrieved
1346
1347
1347 block : bool
1348 block : bool
1348 Whether to wait for the result to be done
1349 Whether to wait for the result to be done
1349
1350
1350 Returns
1351 Returns
1351 -------
1352 -------
1352
1353
1353 AsyncResult
1354 AsyncResult
1354 A single AsyncResult object will always be returned.
1355 A single AsyncResult object will always be returned.
1355
1356
1356 AsyncHubResult
1357 AsyncHubResult
1357 A subclass of AsyncResult that retrieves results from the Hub
1358 A subclass of AsyncResult that retrieves results from the Hub
1358
1359
1359 """
1360 """
1360 if indices_or_msg_ids is None:
1361 if indices_or_msg_ids is None:
1361 indices_or_msg_ids = -1
1362 indices_or_msg_ids = -1
1362
1363
1363 if not isinstance(indices_or_msg_ids, (list,tuple)):
1364 if not isinstance(indices_or_msg_ids, (list,tuple)):
1364 indices_or_msg_ids = [indices_or_msg_ids]
1365 indices_or_msg_ids = [indices_or_msg_ids]
1365
1366
1366 theids = []
1367 theids = []
1367 for id in indices_or_msg_ids:
1368 for id in indices_or_msg_ids:
1368 if isinstance(id, int):
1369 if isinstance(id, int):
1369 id = self.history[id]
1370 id = self.history[id]
1370 if not isinstance(id, str):
1371 if not isinstance(id, str):
1371 raise TypeError("indices must be str or int, not %r"%id)
1372 raise TypeError("indices must be str or int, not %r"%id)
1372 theids.append(id)
1373 theids.append(id)
1373
1374
1374 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1375 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1375 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1376 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1376
1377
1377 if remote_ids:
1378 if remote_ids:
1378 ar = AsyncHubResult(self, msg_ids=theids)
1379 ar = AsyncHubResult(self, msg_ids=theids)
1379 else:
1380 else:
1380 ar = AsyncResult(self, msg_ids=theids)
1381 ar = AsyncResult(self, msg_ids=theids)
1381
1382
1382 if block:
1383 if block:
1383 ar.wait()
1384 ar.wait()
1384
1385
1385 return ar
1386 return ar
1386
1387
1387 @spinfirst
1388 @spinfirst
1388 def result_status(self, msg_ids, status_only=True):
1389 def result_status(self, msg_ids, status_only=True):
1389 """Check on the status of the result(s) of the apply request with `msg_ids`.
1390 """Check on the status of the result(s) of the apply request with `msg_ids`.
1390
1391
1391 If status_only is False, then the actual results will be retrieved, else
1392 If status_only is False, then the actual results will be retrieved, else
1392 only the status of the results will be checked.
1393 only the status of the results will be checked.
1393
1394
1394 Parameters
1395 Parameters
1395 ----------
1396 ----------
1396
1397
1397 msg_ids : list of msg_ids
1398 msg_ids : list of msg_ids
1398 if int:
1399 if int:
1399 Passed as index to self.history for convenience.
1400 Passed as index to self.history for convenience.
1400 status_only : bool (default: True)
1401 status_only : bool (default: True)
1401 if False:
1402 if False:
1402 Retrieve the actual results of completed tasks.
1403 Retrieve the actual results of completed tasks.
1403
1404
1404 Returns
1405 Returns
1405 -------
1406 -------
1406
1407
1407 results : dict
1408 results : dict
1408 There will always be the keys 'pending' and 'completed', which will
1409 There will always be the keys 'pending' and 'completed', which will
1409 be lists of msg_ids that are incomplete or complete. If `status_only`
1410 be lists of msg_ids that are incomplete or complete. If `status_only`
1410 is False, then completed results will be keyed by their `msg_id`.
1411 is False, then completed results will be keyed by their `msg_id`.
1411 """
1412 """
1412 if not isinstance(msg_ids, (list,tuple)):
1413 if not isinstance(msg_ids, (list,tuple)):
1413 msg_ids = [msg_ids]
1414 msg_ids = [msg_ids]
1414
1415
1415 theids = []
1416 theids = []
1416 for msg_id in msg_ids:
1417 for msg_id in msg_ids:
1417 if isinstance(msg_id, int):
1418 if isinstance(msg_id, int):
1418 msg_id = self.history[msg_id]
1419 msg_id = self.history[msg_id]
1419 if not isinstance(msg_id, basestring):
1420 if not isinstance(msg_id, basestring):
1420 raise TypeError("msg_ids must be str, not %r"%msg_id)
1421 raise TypeError("msg_ids must be str, not %r"%msg_id)
1421 theids.append(msg_id)
1422 theids.append(msg_id)
1422
1423
1423 completed = []
1424 completed = []
1424 local_results = {}
1425 local_results = {}
1425
1426
1426 # comment this block out to temporarily disable local shortcut:
1427 # comment this block out to temporarily disable local shortcut:
1427 for msg_id in theids:
1428 for msg_id in theids:
1428 if msg_id in self.results:
1429 if msg_id in self.results:
1429 completed.append(msg_id)
1430 completed.append(msg_id)
1430 local_results[msg_id] = self.results[msg_id]
1431 local_results[msg_id] = self.results[msg_id]
1431 theids.remove(msg_id)
1432 theids.remove(msg_id)
1432
1433
1433 if theids: # some not locally cached
1434 if theids: # some not locally cached
1434 content = dict(msg_ids=theids, status_only=status_only)
1435 content = dict(msg_ids=theids, status_only=status_only)
1435 msg = self.session.send(self._query_socket, "result_request", content=content)
1436 msg = self.session.send(self._query_socket, "result_request", content=content)
1436 zmq.select([self._query_socket], [], [])
1437 zmq.select([self._query_socket], [], [])
1437 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1438 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1438 if self.debug:
1439 if self.debug:
1439 pprint(msg)
1440 pprint(msg)
1440 content = msg['content']
1441 content = msg['content']
1441 if content['status'] != 'ok':
1442 if content['status'] != 'ok':
1442 raise self._unwrap_exception(content)
1443 raise self._unwrap_exception(content)
1443 buffers = msg['buffers']
1444 buffers = msg['buffers']
1444 else:
1445 else:
1445 content = dict(completed=[],pending=[])
1446 content = dict(completed=[],pending=[])
1446
1447
1447 content['completed'].extend(completed)
1448 content['completed'].extend(completed)
1448
1449
1449 if status_only:
1450 if status_only:
1450 return content
1451 return content
1451
1452
1452 failures = []
1453 failures = []
1453 # load cached results into result:
1454 # load cached results into result:
1454 content.update(local_results)
1455 content.update(local_results)
1455 # update cache with results:
1456 # update cache with results:
1456 for msg_id in sorted(theids):
1457 for msg_id in sorted(theids):
1457 if msg_id in content['completed']:
1458 if msg_id in content['completed']:
1458 rec = content[msg_id]
1459 rec = content[msg_id]
1459 parent = rec['header']
1460 parent = rec['header']
1460 header = rec['result_header']
1461 header = rec['result_header']
1461 rcontent = rec['result_content']
1462 rcontent = rec['result_content']
1462 iodict = rec['io']
1463 iodict = rec['io']
1463 if isinstance(rcontent, str):
1464 if isinstance(rcontent, str):
1464 rcontent = self.session.unpack(rcontent)
1465 rcontent = self.session.unpack(rcontent)
1465
1466
1466 md = self.metadata[msg_id]
1467 md = self.metadata[msg_id]
1467 md.update(self._extract_metadata(header, parent, rcontent))
1468 md.update(self._extract_metadata(header, parent, rcontent))
1468 md.update(iodict)
1469 md.update(iodict)
1469
1470
1470 if rcontent['status'] == 'ok':
1471 if rcontent['status'] == 'ok':
1471 res,buffers = util.unserialize_object(buffers)
1472 res,buffers = util.unserialize_object(buffers)
1472 else:
1473 else:
1473 print rcontent
1474 print rcontent
1474 res = self._unwrap_exception(rcontent)
1475 res = self._unwrap_exception(rcontent)
1475 failures.append(res)
1476 failures.append(res)
1476
1477
1477 self.results[msg_id] = res
1478 self.results[msg_id] = res
1478 content[msg_id] = res
1479 content[msg_id] = res
1479
1480
1480 if len(theids) == 1 and failures:
1481 if len(theids) == 1 and failures:
1481 raise failures[0]
1482 raise failures[0]
1482
1483
1483 error.collect_exceptions(failures, "result_status")
1484 error.collect_exceptions(failures, "result_status")
1484 return content
1485 return content
1485
1486
1486 @spinfirst
1487 @spinfirst
1487 def queue_status(self, targets='all', verbose=False):
1488 def queue_status(self, targets='all', verbose=False):
1488 """Fetch the status of engine queues.
1489 """Fetch the status of engine queues.
1489
1490
1490 Parameters
1491 Parameters
1491 ----------
1492 ----------
1492
1493
1493 targets : int/str/list of ints/strs
1494 targets : int/str/list of ints/strs
1494 the engines whose states are to be queried.
1495 the engines whose states are to be queried.
1495 default : all
1496 default : all
1496 verbose : bool
1497 verbose : bool
1497 Whether to return lengths only, or lists of ids for each element
1498 Whether to return lengths only, or lists of ids for each element
1498 """
1499 """
1499 targets = self._build_targets(targets)[1]
1500 targets = self._build_targets(targets)[1]
1500 content = dict(targets=targets, verbose=verbose)
1501 content = dict(targets=targets, verbose=verbose)
1501 self.session.send(self._query_socket, "queue_request", content=content)
1502 self.session.send(self._query_socket, "queue_request", content=content)
1502 idents,msg = self.session.recv(self._query_socket, 0)
1503 idents,msg = self.session.recv(self._query_socket, 0)
1503 if self.debug:
1504 if self.debug:
1504 pprint(msg)
1505 pprint(msg)
1505 content = msg['content']
1506 content = msg['content']
1506 status = content.pop('status')
1507 status = content.pop('status')
1507 if status != 'ok':
1508 if status != 'ok':
1508 raise self._unwrap_exception(content)
1509 raise self._unwrap_exception(content)
1509 return util.rekey(content)
1510 return util.rekey(content)
1510
1511
1511 @spinfirst
1512 @spinfirst
1512 def purge_results(self, jobs=[], targets=[]):
1513 def purge_results(self, jobs=[], targets=[]):
1513 """Tell the controller to forget results.
1514 """Tell the controller to forget results.
1514
1515
1515 Individual results can be purged by msg_id, or the entire
1516 Individual results can be purged by msg_id, or the entire
1516 history of specific targets can be purged.
1517 history of specific targets can be purged.
1517
1518
1518 Parameters
1519 Parameters
1519 ----------
1520 ----------
1520
1521
1521 jobs : str or list of strs or AsyncResult objects
1522 jobs : str or list of strs or AsyncResult objects
1522 the msg_ids whose results should be forgotten.
1523 the msg_ids whose results should be forgotten.
1523 targets : int/str/list of ints/strs
1524 targets : int/str/list of ints/strs
1524 The targets, by uuid or int_id, whose entire history is to be purged.
1525 The targets, by uuid or int_id, whose entire history is to be purged.
1525 Use `targets='all'` to scrub everything from the controller's memory.
1526 Use `targets='all'` to scrub everything from the controller's memory.
1526
1527
1527 default : None
1528 default : None
1528 """
1529 """
1529 if not targets and not jobs:
1530 if not targets and not jobs:
1530 raise ValueError("Must specify at least one of `targets` and `jobs`")
1531 raise ValueError("Must specify at least one of `targets` and `jobs`")
1531 if targets:
1532 if targets:
1532 targets = self._build_targets(targets)[1]
1533 targets = self._build_targets(targets)[1]
1533
1534
1534 # construct msg_ids from jobs
1535 # construct msg_ids from jobs
1535 msg_ids = []
1536 msg_ids = []
1536 if isinstance(jobs, (basestring,AsyncResult)):
1537 if isinstance(jobs, (basestring,AsyncResult)):
1537 jobs = [jobs]
1538 jobs = [jobs]
1538 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1539 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1539 if bad_ids:
1540 if bad_ids:
1540 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1541 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1541 for j in jobs:
1542 for j in jobs:
1542 if isinstance(j, AsyncResult):
1543 if isinstance(j, AsyncResult):
1543 msg_ids.extend(j.msg_ids)
1544 msg_ids.extend(j.msg_ids)
1544 else:
1545 else:
1545 msg_ids.append(j)
1546 msg_ids.append(j)
1546
1547
1547 content = dict(targets=targets, msg_ids=msg_ids)
1548 content = dict(targets=targets, msg_ids=msg_ids)
1548 self.session.send(self._query_socket, "purge_request", content=content)
1549 self.session.send(self._query_socket, "purge_request", content=content)
1549 idents, msg = self.session.recv(self._query_socket, 0)
1550 idents, msg = self.session.recv(self._query_socket, 0)
1550 if self.debug:
1551 if self.debug:
1551 pprint(msg)
1552 pprint(msg)
1552 content = msg['content']
1553 content = msg['content']
1553 if content['status'] != 'ok':
1554 if content['status'] != 'ok':
1554 raise self._unwrap_exception(content)
1555 raise self._unwrap_exception(content)
1555
1556
1556
1557
1557 __all__ = [ 'Client',
1558 __all__ = [ 'Client',
1558 'depend',
1559 'depend',
1559 'require',
1560 'require',
1560 'remote',
1561 'remote',
1561 'parallel',
1562 'parallel',
1562 'RemoteFunction',
1563 'RemoteFunction',
1563 'ParallelFunction',
1564 'ParallelFunction',
1564 'DirectView',
1565 'DirectView',
1565 'LoadBalancedView',
1566 'LoadBalancedView',
1566 'AsyncResult',
1567 'AsyncResult',
1567 'AsyncMapResult',
1568 'AsyncMapResult',
1568 'Reference'
1569 'Reference'
1569 ]
1570 ]
@@ -1,138 +1,148 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """A simple engine that talks to a controller over 0MQ.
2 """A simple engine that talks to a controller over 0MQ.
3 it handles registration, etc. and launches a kernel
3 it handles registration, etc. and launches a kernel
4 connected to the Controller's Schedulers.
4 connected to the Controller's Schedulers.
5 """
5 """
6 from __future__ import print_function
6 from __future__ import print_function
7
7
8 import sys
8 import sys
9 import time
9 import time
10
10
11 import zmq
11 import zmq
12 from zmq.eventloop import ioloop, zmqstream
12 from zmq.eventloop import ioloop, zmqstream
13
13
14 # internal
14 # internal
15 from IPython.utils.traitlets import Instance, Str, Dict, Int, Type, CFloat
15 from IPython.utils.traitlets import Instance, Str, Dict, Int, Type, CFloat
16 # from IPython.utils.localinterfaces import LOCALHOST
16 # from IPython.utils.localinterfaces import LOCALHOST
17
17
18 from . import heartmonitor
18 from . import heartmonitor
19 from .factory import RegistrationFactory
19 from .factory import RegistrationFactory
20 from .streamkernel import Kernel
20 from .streamkernel import Kernel
21 from .streamsession import Message
21 from .streamsession import Message
22 from .util import disambiguate_url
22 from .util import disambiguate_url
23
23
24 class EngineFactory(RegistrationFactory):
24 class EngineFactory(RegistrationFactory):
25 """IPython engine"""
25 """IPython engine"""
26
26
27 # configurables:
27 # configurables:
28 user_ns=Dict(config=True)
28 user_ns=Dict(config=True)
29 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True)
29 out_stream_factory=Type('IPython.zmq.iostream.OutStream', config=True)
30 display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True)
30 display_hook_factory=Type('IPython.zmq.displayhook.DisplayHook', config=True)
31 location=Str(config=True)
31 location=Str(config=True)
32 timeout=CFloat(2,config=True)
32 timeout=CFloat(2,config=True)
33
33
34 # not configurable:
34 # not configurable:
35 id=Int(allow_none=True)
35 id=Int(allow_none=True)
36 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
36 registrar=Instance('zmq.eventloop.zmqstream.ZMQStream')
37 kernel=Instance(Kernel)
37 kernel=Instance(Kernel)
38
38
39
39
40 def __init__(self, **kwargs):
40 def __init__(self, **kwargs):
41 super(EngineFactory, self).__init__(**kwargs)
41 super(EngineFactory, self).__init__(**kwargs)
42 ctx = self.context
42 ctx = self.context
43
43
44 reg = ctx.socket(zmq.PAIR)
44 reg = ctx.socket(zmq.XREQ)
45 reg.setsockopt(zmq.IDENTITY, self.ident)
45 reg.setsockopt(zmq.IDENTITY, self.ident)
46 reg.connect(self.url)
46 reg.connect(self.url)
47 self.registrar = zmqstream.ZMQStream(reg, self.loop)
47 self.registrar = zmqstream.ZMQStream(reg, self.loop)
48
48
49 def register(self):
49 def register(self):
50 """send the registration_request"""
50 """send the registration_request"""
51
51
52 self.log.info("registering")
52 self.log.info("registering")
53 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
53 content = dict(queue=self.ident, heartbeat=self.ident, control=self.ident)
54 self.registrar.on_recv(self.complete_registration)
54 self.registrar.on_recv(self.complete_registration)
55 # print (self.session.key)
55 # print (self.session.key)
56 self.session.send(self.registrar, "registration_request",content=content)
56 self.session.send(self.registrar, "registration_request",content=content)
57
57
58 def complete_registration(self, msg):
58 def complete_registration(self, msg):
59 # print msg
59 # print msg
60 self._abort_dc.stop()
60 self._abort_dc.stop()
61 ctx = self.context
61 ctx = self.context
62 loop = self.loop
62 loop = self.loop
63 identity = self.ident
63 identity = self.ident
64
64
65 idents,msg = self.session.feed_identities(msg)
65 idents,msg = self.session.feed_identities(msg)
66 msg = Message(self.session.unpack_message(msg))
66 msg = Message(self.session.unpack_message(msg))
67
67
68 if msg.content.status == 'ok':
68 if msg.content.status == 'ok':
69 self.id = int(msg.content.id)
69 self.id = int(msg.content.id)
70
70
71 # create Shell Streams (MUX, Task, etc.):
71 # create Shell Streams (MUX, Task, etc.):
72 queue_addr = msg.content.mux
72 queue_addr = msg.content.mux
73 shell_addrs = [ str(queue_addr) ]
73 shell_addrs = [ str(queue_addr) ]
74 task_addr = msg.content.task
74 task_addr = msg.content.task
75 if task_addr:
75 if task_addr:
76 shell_addrs.append(str(task_addr))
76 shell_addrs.append(str(task_addr))
77 shell_streams = []
77
78 # Uncomment this to go back to two-socket model
79 # shell_streams = []
80 # for addr in shell_addrs:
81 # stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
82 # stream.setsockopt(zmq.IDENTITY, identity)
83 # stream.connect(disambiguate_url(addr, self.location))
84 # shell_streams.append(stream)
85
86 # Now use only one shell stream for mux and tasks
87 stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
88 stream.setsockopt(zmq.IDENTITY, identity)
89 shell_streams = [stream]
78 for addr in shell_addrs:
90 for addr in shell_addrs:
79 stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
80 stream.setsockopt(zmq.IDENTITY, identity)
81 stream.connect(disambiguate_url(addr, self.location))
91 stream.connect(disambiguate_url(addr, self.location))
82 shell_streams.append(stream)
92 # end single stream-socket
83
93
84 # control stream:
94 # control stream:
85 control_addr = str(msg.content.control)
95 control_addr = str(msg.content.control)
86 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.PAIR), loop)
96 control_stream = zmqstream.ZMQStream(ctx.socket(zmq.XREP), loop)
87 control_stream.setsockopt(zmq.IDENTITY, identity)
97 control_stream.setsockopt(zmq.IDENTITY, identity)
88 control_stream.connect(disambiguate_url(control_addr, self.location))
98 control_stream.connect(disambiguate_url(control_addr, self.location))
89
99
90 # create iopub stream:
100 # create iopub stream:
91 iopub_addr = msg.content.iopub
101 iopub_addr = msg.content.iopub
92 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
102 iopub_stream = zmqstream.ZMQStream(ctx.socket(zmq.PUB), loop)
93 iopub_stream.setsockopt(zmq.IDENTITY, identity)
103 iopub_stream.setsockopt(zmq.IDENTITY, identity)
94 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
104 iopub_stream.connect(disambiguate_url(iopub_addr, self.location))
95
105
96 # launch heartbeat
106 # launch heartbeat
97 hb_addrs = msg.content.heartbeat
107 hb_addrs = msg.content.heartbeat
98 # print (hb_addrs)
108 # print (hb_addrs)
99
109
100 # # Redirect input streams and set a display hook.
110 # # Redirect input streams and set a display hook.
101 if self.out_stream_factory:
111 if self.out_stream_factory:
102 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
112 sys.stdout = self.out_stream_factory(self.session, iopub_stream, u'stdout')
103 sys.stdout.topic = 'engine.%i.stdout'%self.id
113 sys.stdout.topic = 'engine.%i.stdout'%self.id
104 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
114 sys.stderr = self.out_stream_factory(self.session, iopub_stream, u'stderr')
105 sys.stderr.topic = 'engine.%i.stderr'%self.id
115 sys.stderr.topic = 'engine.%i.stderr'%self.id
106 if self.display_hook_factory:
116 if self.display_hook_factory:
107 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
117 sys.displayhook = self.display_hook_factory(self.session, iopub_stream)
108 sys.displayhook.topic = 'engine.%i.pyout'%self.id
118 sys.displayhook.topic = 'engine.%i.pyout'%self.id
109
119
110 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
120 self.kernel = Kernel(config=self.config, int_id=self.id, ident=self.ident, session=self.session,
111 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
121 control_stream=control_stream, shell_streams=shell_streams, iopub_stream=iopub_stream,
112 loop=loop, user_ns = self.user_ns, logname=self.log.name)
122 loop=loop, user_ns = self.user_ns, logname=self.log.name)
113 self.kernel.start()
123 self.kernel.start()
114 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
124 hb_addrs = [ disambiguate_url(addr, self.location) for addr in hb_addrs ]
115 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
125 heart = heartmonitor.Heart(*map(str, hb_addrs), heart_id=identity)
116 # ioloop.DelayedCallback(heart.start, 1000, self.loop).start()
126 # ioloop.DelayedCallback(heart.start, 1000, self.loop).start()
117 heart.start()
127 heart.start()
118
128
119
129
120 else:
130 else:
121 self.log.fatal("Registration Failed: %s"%msg)
131 self.log.fatal("Registration Failed: %s"%msg)
122 raise Exception("Registration Failed: %s"%msg)
132 raise Exception("Registration Failed: %s"%msg)
123
133
124 self.log.info("Completed registration with id %i"%self.id)
134 self.log.info("Completed registration with id %i"%self.id)
125
135
126
136
127 def abort(self):
137 def abort(self):
128 self.log.fatal("Registration timed out")
138 self.log.fatal("Registration timed out")
129 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
139 self.session.send(self.registrar, "unregistration_request", content=dict(id=self.id))
130 time.sleep(1)
140 time.sleep(1)
131 sys.exit(255)
141 sys.exit(255)
132
142
133 def start(self):
143 def start(self):
134 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
144 dc = ioloop.DelayedCallback(self.register, 0, self.loop)
135 dc.start()
145 dc.start()
136 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
146 self._abort_dc = ioloop.DelayedCallback(self.abort, self.timeout*1000, self.loop)
137 self._abort_dc.start()
147 self._abort_dc.start()
138
148
@@ -1,1055 +1,1039 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from datetime import datetime
20 from datetime import datetime
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
24 from zmq.eventloop.zmqstream import ZMQStream
25
25
26 # internal:
26 # internal:
27 from IPython.utils.importstring import import_item
27 from IPython.utils.importstring import import_item
28 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
28 from IPython.utils.traitlets import HasTraits, Instance, Int, CStr, Str, Dict, Set, List, Bool
29
29
30 from .entry_point import select_random_ports
30 from .entry_point import select_random_ports
31 from .factory import RegistrationFactory, LoggingFactory
31 from .factory import RegistrationFactory, LoggingFactory
32
32
33 from . import error
33 from . import error
34 from .heartmonitor import HeartMonitor
34 from .heartmonitor import HeartMonitor
35 from .util import validate_url_container, ISO8601
35 from .util import validate_url_container, ISO8601
36
36
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Code
38 # Code
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40
40
41 def _passer(*args, **kwargs):
41 def _passer(*args, **kwargs):
42 return
42 return
43
43
44 def _printer(*args, **kwargs):
44 def _printer(*args, **kwargs):
45 print (args)
45 print (args)
46 print (kwargs)
46 print (kwargs)
47
47
48 def init_record(msg):
48 def init_record(msg):
49 """Initialize a TaskRecord based on a request."""
49 """Initialize a TaskRecord based on a request."""
50 header = msg['header']
50 header = msg['header']
51 return {
51 return {
52 'msg_id' : header['msg_id'],
52 'msg_id' : header['msg_id'],
53 'header' : header,
53 'header' : header,
54 'content': msg['content'],
54 'content': msg['content'],
55 'buffers': msg['buffers'],
55 'buffers': msg['buffers'],
56 'submitted': datetime.strptime(header['date'], ISO8601),
56 'submitted': datetime.strptime(header['date'], ISO8601),
57 'client_uuid' : None,
57 'client_uuid' : None,
58 'engine_uuid' : None,
58 'engine_uuid' : None,
59 'started': None,
59 'started': None,
60 'completed': None,
60 'completed': None,
61 'resubmitted': None,
61 'resubmitted': None,
62 'result_header' : None,
62 'result_header' : None,
63 'result_content' : None,
63 'result_content' : None,
64 'result_buffers' : None,
64 'result_buffers' : None,
65 'queue' : None,
65 'queue' : None,
66 'pyin' : None,
66 'pyin' : None,
67 'pyout': None,
67 'pyout': None,
68 'pyerr': None,
68 'pyerr': None,
69 'stdout': '',
69 'stdout': '',
70 'stderr': '',
70 'stderr': '',
71 }
71 }
72
72
73
73
74 class EngineConnector(HasTraits):
74 class EngineConnector(HasTraits):
75 """A simple object for accessing the various zmq connections of an object.
75 """A simple object for accessing the various zmq connections of an object.
76 Attributes are:
76 Attributes are:
77 id (int): engine ID
77 id (int): engine ID
78 uuid (str): uuid (unused?)
78 uuid (str): uuid (unused?)
79 queue (str): identity of queue's XREQ socket
79 queue (str): identity of queue's XREQ socket
80 registration (str): identity of registration XREQ socket
80 registration (str): identity of registration XREQ socket
81 heartbeat (str): identity of heartbeat XREQ socket
81 heartbeat (str): identity of heartbeat XREQ socket
82 """
82 """
83 id=Int(0)
83 id=Int(0)
84 queue=Str()
84 queue=Str()
85 control=Str()
85 control=Str()
86 registration=Str()
86 registration=Str()
87 heartbeat=Str()
87 heartbeat=Str()
88 pending=Set()
88 pending=Set()
89
89
90 class HubFactory(RegistrationFactory):
90 class HubFactory(RegistrationFactory):
91 """The Configurable for setting up a Hub."""
91 """The Configurable for setting up a Hub."""
92
92
93 # name of a scheduler scheme
93 # name of a scheduler scheme
94 scheme = Str('leastload', config=True)
94 scheme = Str('leastload', config=True)
95
95
96 # port-pairs for monitoredqueues:
96 # port-pairs for monitoredqueues:
97 hb = Instance(list, config=True)
97 hb = Instance(list, config=True)
98 def _hb_default(self):
98 def _hb_default(self):
99 return select_random_ports(2)
99 return select_random_ports(2)
100
100
101 mux = Instance(list, config=True)
101 mux = Instance(list, config=True)
102 def _mux_default(self):
102 def _mux_default(self):
103 return select_random_ports(2)
103 return select_random_ports(2)
104
104
105 task = Instance(list, config=True)
105 task = Instance(list, config=True)
106 def _task_default(self):
106 def _task_default(self):
107 return select_random_ports(2)
107 return select_random_ports(2)
108
108
109 control = Instance(list, config=True)
109 control = Instance(list, config=True)
110 def _control_default(self):
110 def _control_default(self):
111 return select_random_ports(2)
111 return select_random_ports(2)
112
112
113 iopub = Instance(list, config=True)
113 iopub = Instance(list, config=True)
114 def _iopub_default(self):
114 def _iopub_default(self):
115 return select_random_ports(2)
115 return select_random_ports(2)
116
116
117 # single ports:
117 # single ports:
118 mon_port = Instance(int, config=True)
118 mon_port = Instance(int, config=True)
119 def _mon_port_default(self):
119 def _mon_port_default(self):
120 return select_random_ports(1)[0]
120 return select_random_ports(1)[0]
121
121
122 query_port = Instance(int, config=True)
123 def _query_port_default(self):
124 return select_random_ports(1)[0]
125
126 notifier_port = Instance(int, config=True)
122 notifier_port = Instance(int, config=True)
127 def _notifier_port_default(self):
123 def _notifier_port_default(self):
128 return select_random_ports(1)[0]
124 return select_random_ports(1)[0]
129
125
130 ping = Int(1000, config=True) # ping frequency
126 ping = Int(1000, config=True) # ping frequency
131
127
132 engine_ip = CStr('127.0.0.1', config=True)
128 engine_ip = CStr('127.0.0.1', config=True)
133 engine_transport = CStr('tcp', config=True)
129 engine_transport = CStr('tcp', config=True)
134
130
135 client_ip = CStr('127.0.0.1', config=True)
131 client_ip = CStr('127.0.0.1', config=True)
136 client_transport = CStr('tcp', config=True)
132 client_transport = CStr('tcp', config=True)
137
133
138 monitor_ip = CStr('127.0.0.1', config=True)
134 monitor_ip = CStr('127.0.0.1', config=True)
139 monitor_transport = CStr('tcp', config=True)
135 monitor_transport = CStr('tcp', config=True)
140
136
141 monitor_url = CStr('')
137 monitor_url = CStr('')
142
138
143 db_class = CStr('IPython.zmq.parallel.dictdb.DictDB', config=True)
139 db_class = CStr('IPython.zmq.parallel.dictdb.DictDB', config=True)
144
140
145 # not configurable
141 # not configurable
146 db = Instance('IPython.zmq.parallel.dictdb.BaseDB')
142 db = Instance('IPython.zmq.parallel.dictdb.BaseDB')
147 heartmonitor = Instance('IPython.zmq.parallel.heartmonitor.HeartMonitor')
143 heartmonitor = Instance('IPython.zmq.parallel.heartmonitor.HeartMonitor')
148 subconstructors = List()
144 subconstructors = List()
149 _constructed = Bool(False)
145 _constructed = Bool(False)
150
146
151 def _ip_changed(self, name, old, new):
147 def _ip_changed(self, name, old, new):
152 self.engine_ip = new
148 self.engine_ip = new
153 self.client_ip = new
149 self.client_ip = new
154 self.monitor_ip = new
150 self.monitor_ip = new
155 self._update_monitor_url()
151 self._update_monitor_url()
156
152
157 def _update_monitor_url(self):
153 def _update_monitor_url(self):
158 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
154 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
159
155
160 def _transport_changed(self, name, old, new):
156 def _transport_changed(self, name, old, new):
161 self.engine_transport = new
157 self.engine_transport = new
162 self.client_transport = new
158 self.client_transport = new
163 self.monitor_transport = new
159 self.monitor_transport = new
164 self._update_monitor_url()
160 self._update_monitor_url()
165
161
166 def __init__(self, **kwargs):
162 def __init__(self, **kwargs):
167 super(HubFactory, self).__init__(**kwargs)
163 super(HubFactory, self).__init__(**kwargs)
168 self._update_monitor_url()
164 self._update_monitor_url()
169 # self.on_trait_change(self._sync_ips, 'ip')
165 # self.on_trait_change(self._sync_ips, 'ip')
170 # self.on_trait_change(self._sync_transports, 'transport')
166 # self.on_trait_change(self._sync_transports, 'transport')
171 self.subconstructors.append(self.construct_hub)
167 self.subconstructors.append(self.construct_hub)
172
168
173
169
174 def construct(self):
170 def construct(self):
175 assert not self._constructed, "already constructed!"
171 assert not self._constructed, "already constructed!"
176
172
177 for subc in self.subconstructors:
173 for subc in self.subconstructors:
178 subc()
174 subc()
179
175
180 self._constructed = True
176 self._constructed = True
181
177
182
178
183 def start(self):
179 def start(self):
184 assert self._constructed, "must be constructed by self.construct() first!"
180 assert self._constructed, "must be constructed by self.construct() first!"
185 self.heartmonitor.start()
181 self.heartmonitor.start()
186 self.log.info("Heartmonitor started")
182 self.log.info("Heartmonitor started")
187
183
188 def construct_hub(self):
184 def construct_hub(self):
189 """construct"""
185 """construct"""
190 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
186 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
191 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
187 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
192
188
193 ctx = self.context
189 ctx = self.context
194 loop = self.loop
190 loop = self.loop
195
191
196 # Registrar socket
192 # Registrar socket
197 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
193 q = ZMQStream(ctx.socket(zmq.XREP), loop)
198 reg.bind(client_iface % self.regport)
194 q.bind(client_iface % self.regport)
199 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
195 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
200 if self.client_ip != self.engine_ip:
196 if self.client_ip != self.engine_ip:
201 reg.bind(engine_iface % self.regport)
197 q.bind(engine_iface % self.regport)
202 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
198 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
203
199
204 ### Engine connections ###
200 ### Engine connections ###
205
201
206 # heartbeat
202 # heartbeat
207 hpub = ctx.socket(zmq.PUB)
203 hpub = ctx.socket(zmq.PUB)
208 hpub.bind(engine_iface % self.hb[0])
204 hpub.bind(engine_iface % self.hb[0])
209 hrep = ctx.socket(zmq.XREP)
205 hrep = ctx.socket(zmq.XREP)
210 hrep.bind(engine_iface % self.hb[1])
206 hrep.bind(engine_iface % self.hb[1])
211 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
207 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
212 period=self.ping, logname=self.log.name)
208 period=self.ping, logname=self.log.name)
213
209
214 ### Client connections ###
210 ### Client connections ###
215 # Clientele socket
216 c = ZMQStream(ctx.socket(zmq.XREP), loop)
217 c.bind(client_iface%self.query_port)
218 # Notifier socket
211 # Notifier socket
219 n = ZMQStream(ctx.socket(zmq.PUB), loop)
212 n = ZMQStream(ctx.socket(zmq.PUB), loop)
220 n.bind(client_iface%self.notifier_port)
213 n.bind(client_iface%self.notifier_port)
221
214
222 ### build and launch the queues ###
215 ### build and launch the queues ###
223
216
224 # monitor socket
217 # monitor socket
225 sub = ctx.socket(zmq.SUB)
218 sub = ctx.socket(zmq.SUB)
226 sub.setsockopt(zmq.SUBSCRIBE, "")
219 sub.setsockopt(zmq.SUBSCRIBE, "")
227 sub.bind(self.monitor_url)
220 sub.bind(self.monitor_url)
228 sub.bind('inproc://monitor')
221 sub.bind('inproc://monitor')
229 sub = ZMQStream(sub, loop)
222 sub = ZMQStream(sub, loop)
230
223
231 # connect the db
224 # connect the db
232 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
225 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
233 cdir = self.config.Global.cluster_dir
226 # cdir = self.config.Global.cluster_dir
234 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
227 self.db = import_item(self.db_class)(session=self.session.session, config=self.config)
235 time.sleep(.25)
228 time.sleep(.25)
236
229
237 # build connection dicts
230 # build connection dicts
238 self.engine_info = {
231 self.engine_info = {
239 'control' : engine_iface%self.control[1],
232 'control' : engine_iface%self.control[1],
240 'mux': engine_iface%self.mux[1],
233 'mux': engine_iface%self.mux[1],
241 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
234 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
242 'task' : engine_iface%self.task[1],
235 'task' : engine_iface%self.task[1],
243 'iopub' : engine_iface%self.iopub[1],
236 'iopub' : engine_iface%self.iopub[1],
244 # 'monitor' : engine_iface%self.mon_port,
237 # 'monitor' : engine_iface%self.mon_port,
245 }
238 }
246
239
247 self.client_info = {
240 self.client_info = {
248 'control' : client_iface%self.control[0],
241 'control' : client_iface%self.control[0],
249 'query': client_iface%self.query_port,
250 'mux': client_iface%self.mux[0],
242 'mux': client_iface%self.mux[0],
251 'task' : (self.scheme, client_iface%self.task[0]),
243 'task' : (self.scheme, client_iface%self.task[0]),
252 'iopub' : client_iface%self.iopub[0],
244 'iopub' : client_iface%self.iopub[0],
253 'notification': client_iface%self.notifier_port
245 'notification': client_iface%self.notifier_port
254 }
246 }
255 self.log.debug("Hub engine addrs: %s"%self.engine_info)
247 self.log.debug("Hub engine addrs: %s"%self.engine_info)
256 self.log.debug("Hub client addrs: %s"%self.client_info)
248 self.log.debug("Hub client addrs: %s"%self.client_info)
257 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
249 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
258 registrar=reg, clientele=c, notifier=n, db=self.db,
250 query=q, notifier=n, db=self.db,
259 engine_info=self.engine_info, client_info=self.client_info,
251 engine_info=self.engine_info, client_info=self.client_info,
260 logname=self.log.name)
252 logname=self.log.name)
261
253
262
254
263 class Hub(LoggingFactory):
255 class Hub(LoggingFactory):
264 """The IPython Controller Hub with 0MQ connections
256 """The IPython Controller Hub with 0MQ connections
265
257
266 Parameters
258 Parameters
267 ==========
259 ==========
268 loop: zmq IOLoop instance
260 loop: zmq IOLoop instance
269 session: StreamSession object
261 session: StreamSession object
270 <removed> context: zmq context for creating new connections (?)
262 <removed> context: zmq context for creating new connections (?)
271 queue: ZMQStream for monitoring the command queue (SUB)
263 queue: ZMQStream for monitoring the command queue (SUB)
272 registrar: ZMQStream for engine registration requests (XREP)
264 query: ZMQStream for engine registration and client queries requests (XREP)
273 heartbeat: HeartMonitor object checking the pulse of the engines
265 heartbeat: HeartMonitor object checking the pulse of the engines
274 clientele: ZMQStream for client connections (XREP)
275 not used for jobs, only query/control commands
276 notifier: ZMQStream for broadcasting engine registration changes (PUB)
266 notifier: ZMQStream for broadcasting engine registration changes (PUB)
277 db: connection to db for out of memory logging of commands
267 db: connection to db for out of memory logging of commands
278 NotImplemented
268 NotImplemented
279 engine_info: dict of zmq connection information for engines to connect
269 engine_info: dict of zmq connection information for engines to connect
280 to the queues.
270 to the queues.
281 client_info: dict of zmq connection information for engines to connect
271 client_info: dict of zmq connection information for engines to connect
282 to the queues.
272 to the queues.
283 """
273 """
284 # internal data structures:
274 # internal data structures:
285 ids=Set() # engine IDs
275 ids=Set() # engine IDs
286 keytable=Dict()
276 keytable=Dict()
287 by_ident=Dict()
277 by_ident=Dict()
288 engines=Dict()
278 engines=Dict()
289 clients=Dict()
279 clients=Dict()
290 hearts=Dict()
280 hearts=Dict()
291 pending=Set()
281 pending=Set()
292 queues=Dict() # pending msg_ids keyed by engine_id
282 queues=Dict() # pending msg_ids keyed by engine_id
293 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
283 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
294 completed=Dict() # completed msg_ids keyed by engine_id
284 completed=Dict() # completed msg_ids keyed by engine_id
295 all_completed=Set() # completed msg_ids keyed by engine_id
285 all_completed=Set() # completed msg_ids keyed by engine_id
296 # mia=None
286 # mia=None
297 incoming_registrations=Dict()
287 incoming_registrations=Dict()
298 registration_timeout=Int()
288 registration_timeout=Int()
299 _idcounter=Int(0)
289 _idcounter=Int(0)
300
290
301 # objects from constructor:
291 # objects from constructor:
302 loop=Instance(ioloop.IOLoop)
292 loop=Instance(ioloop.IOLoop)
303 registrar=Instance(ZMQStream)
293 query=Instance(ZMQStream)
304 clientele=Instance(ZMQStream)
305 monitor=Instance(ZMQStream)
294 monitor=Instance(ZMQStream)
306 heartmonitor=Instance(HeartMonitor)
295 heartmonitor=Instance(HeartMonitor)
307 notifier=Instance(ZMQStream)
296 notifier=Instance(ZMQStream)
308 db=Instance(object)
297 db=Instance(object)
309 client_info=Dict()
298 client_info=Dict()
310 engine_info=Dict()
299 engine_info=Dict()
311
300
312
301
313 def __init__(self, **kwargs):
302 def __init__(self, **kwargs):
314 """
303 """
315 # universal:
304 # universal:
316 loop: IOLoop for creating future connections
305 loop: IOLoop for creating future connections
317 session: streamsession for sending serialized data
306 session: streamsession for sending serialized data
318 # engine:
307 # engine:
319 queue: ZMQStream for monitoring queue messages
308 queue: ZMQStream for monitoring queue messages
320 registrar: ZMQStream for engine registration
309 query: ZMQStream for engine+client registration and client requests
321 heartbeat: HeartMonitor object for tracking engines
310 heartbeat: HeartMonitor object for tracking engines
322 # client:
323 clientele: ZMQStream for client connections
324 # extra:
311 # extra:
325 db: ZMQStream for db connection (NotImplemented)
312 db: ZMQStream for db connection (NotImplemented)
326 engine_info: zmq address/protocol dict for engine connections
313 engine_info: zmq address/protocol dict for engine connections
327 client_info: zmq address/protocol dict for client connections
314 client_info: zmq address/protocol dict for client connections
328 """
315 """
329
316
330 super(Hub, self).__init__(**kwargs)
317 super(Hub, self).__init__(**kwargs)
331 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
318 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
332
319
333 # validate connection dicts:
320 # validate connection dicts:
334 for k,v in self.client_info.iteritems():
321 for k,v in self.client_info.iteritems():
335 if k == 'task':
322 if k == 'task':
336 validate_url_container(v[1])
323 validate_url_container(v[1])
337 else:
324 else:
338 validate_url_container(v)
325 validate_url_container(v)
339 # validate_url_container(self.client_info)
326 # validate_url_container(self.client_info)
340 validate_url_container(self.engine_info)
327 validate_url_container(self.engine_info)
341
328
342 # register our callbacks
329 # register our callbacks
343 self.registrar.on_recv(self.dispatch_register_request)
330 self.query.on_recv(self.dispatch_query)
344 self.clientele.on_recv(self.dispatch_client_msg)
345 self.monitor.on_recv(self.dispatch_monitor_traffic)
331 self.monitor.on_recv(self.dispatch_monitor_traffic)
346
332
347 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
333 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
348 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
334 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
349
335
350 self.monitor_handlers = { 'in' : self.save_queue_request,
336 self.monitor_handlers = { 'in' : self.save_queue_request,
351 'out': self.save_queue_result,
337 'out': self.save_queue_result,
352 'intask': self.save_task_request,
338 'intask': self.save_task_request,
353 'outtask': self.save_task_result,
339 'outtask': self.save_task_result,
354 'tracktask': self.save_task_destination,
340 'tracktask': self.save_task_destination,
355 'incontrol': _passer,
341 'incontrol': _passer,
356 'outcontrol': _passer,
342 'outcontrol': _passer,
357 'iopub': self.save_iopub_message,
343 'iopub': self.save_iopub_message,
358 }
344 }
359
345
360 self.client_handlers = {'queue_request': self.queue_status,
346 self.query_handlers = {'queue_request': self.queue_status,
361 'result_request': self.get_results,
347 'result_request': self.get_results,
362 'purge_request': self.purge_results,
348 'purge_request': self.purge_results,
363 'load_request': self.check_load,
349 'load_request': self.check_load,
364 'resubmit_request': self.resubmit_task,
350 'resubmit_request': self.resubmit_task,
365 'shutdown_request': self.shutdown_request,
351 'shutdown_request': self.shutdown_request,
366 }
352 'registration_request' : self.register_engine,
367
368 self.registrar_handlers = {'registration_request' : self.register_engine,
369 'unregistration_request' : self.unregister_engine,
353 'unregistration_request' : self.unregister_engine,
370 'connection_request': self.connection_request,
354 'connection_request': self.connection_request,
371 }
355 }
372
356
373 self.log.info("hub::created hub")
357 self.log.info("hub::created hub")
374
358
375 @property
359 @property
376 def _next_id(self):
360 def _next_id(self):
377 """gemerate a new ID.
361 """gemerate a new ID.
378
362
379 No longer reuse old ids, just count from 0."""
363 No longer reuse old ids, just count from 0."""
380 newid = self._idcounter
364 newid = self._idcounter
381 self._idcounter += 1
365 self._idcounter += 1
382 return newid
366 return newid
383 # newid = 0
367 # newid = 0
384 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
368 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
385 # # print newid, self.ids, self.incoming_registrations
369 # # print newid, self.ids, self.incoming_registrations
386 # while newid in self.ids or newid in incoming:
370 # while newid in self.ids or newid in incoming:
387 # newid += 1
371 # newid += 1
388 # return newid
372 # return newid
389
373
390 #-----------------------------------------------------------------------------
374 #-----------------------------------------------------------------------------
391 # message validation
375 # message validation
392 #-----------------------------------------------------------------------------
376 #-----------------------------------------------------------------------------
393
377
394 def _validate_targets(self, targets):
378 def _validate_targets(self, targets):
395 """turn any valid targets argument into a list of integer ids"""
379 """turn any valid targets argument into a list of integer ids"""
396 if targets is None:
380 if targets is None:
397 # default to all
381 # default to all
398 targets = self.ids
382 targets = self.ids
399
383
400 if isinstance(targets, (int,str,unicode)):
384 if isinstance(targets, (int,str,unicode)):
401 # only one target specified
385 # only one target specified
402 targets = [targets]
386 targets = [targets]
403 _targets = []
387 _targets = []
404 for t in targets:
388 for t in targets:
405 # map raw identities to ids
389 # map raw identities to ids
406 if isinstance(t, (str,unicode)):
390 if isinstance(t, (str,unicode)):
407 t = self.by_ident.get(t, t)
391 t = self.by_ident.get(t, t)
408 _targets.append(t)
392 _targets.append(t)
409 targets = _targets
393 targets = _targets
410 bad_targets = [ t for t in targets if t not in self.ids ]
394 bad_targets = [ t for t in targets if t not in self.ids ]
411 if bad_targets:
395 if bad_targets:
412 raise IndexError("No Such Engine: %r"%bad_targets)
396 raise IndexError("No Such Engine: %r"%bad_targets)
413 if not targets:
397 if not targets:
414 raise IndexError("No Engines Registered")
398 raise IndexError("No Engines Registered")
415 return targets
399 return targets
416
400
417 #-----------------------------------------------------------------------------
401 #-----------------------------------------------------------------------------
418 # dispatch methods (1 per stream)
402 # dispatch methods (1 per stream)
419 #-----------------------------------------------------------------------------
403 #-----------------------------------------------------------------------------
420
404
421 def dispatch_register_request(self, msg):
405 # def dispatch_registration_request(self, msg):
422 """"""
406 # """"""
423 self.log.debug("registration::dispatch_register_request(%s)"%msg)
407 # self.log.debug("registration::dispatch_register_request(%s)"%msg)
424 idents,msg = self.session.feed_identities(msg)
408 # idents,msg = self.session.feed_identities(msg)
425 if not idents:
409 # if not idents:
426 self.log.error("Bad Queue Message: %s"%msg, exc_info=True)
410 # self.log.error("Bad Query Message: %s"%msg, exc_info=True)
427 return
411 # return
428 try:
412 # try:
429 msg = self.session.unpack_message(msg,content=True)
413 # msg = self.session.unpack_message(msg,content=True)
430 except:
414 # except:
431 self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
415 # self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
432 return
416 # return
433
417 #
434 msg_type = msg['msg_type']
418 # msg_type = msg['msg_type']
435 content = msg['content']
419 # content = msg['content']
436
420 #
437 handler = self.registrar_handlers.get(msg_type, None)
421 # handler = self.query_handlers.get(msg_type, None)
438 if handler is None:
422 # if handler is None:
439 self.log.error("registration::got bad registration message: %s"%msg)
423 # self.log.error("registration::got bad registration message: %s"%msg)
440 else:
424 # else:
441 handler(idents, msg)
425 # handler(idents, msg)
442
426
443 def dispatch_monitor_traffic(self, msg):
427 def dispatch_monitor_traffic(self, msg):
444 """all ME and Task queue messages come through here, as well as
428 """all ME and Task queue messages come through here, as well as
445 IOPub traffic."""
429 IOPub traffic."""
446 self.log.debug("monitor traffic: %s"%msg[:2])
430 self.log.debug("monitor traffic: %s"%msg[:2])
447 switch = msg[0]
431 switch = msg[0]
448 idents, msg = self.session.feed_identities(msg[1:])
432 idents, msg = self.session.feed_identities(msg[1:])
449 if not idents:
433 if not idents:
450 self.log.error("Bad Monitor Message: %s"%msg)
434 self.log.error("Bad Monitor Message: %s"%msg)
451 return
435 return
452 handler = self.monitor_handlers.get(switch, None)
436 handler = self.monitor_handlers.get(switch, None)
453 if handler is not None:
437 if handler is not None:
454 handler(idents, msg)
438 handler(idents, msg)
455 else:
439 else:
456 self.log.error("Invalid monitor topic: %s"%switch)
440 self.log.error("Invalid monitor topic: %s"%switch)
457
441
458
442
459 def dispatch_client_msg(self, msg):
443 def dispatch_query(self, msg):
460 """Route messages from clients"""
444 """Route registration requests and queries from clients."""
461 idents, msg = self.session.feed_identities(msg)
445 idents, msg = self.session.feed_identities(msg)
462 if not idents:
446 if not idents:
463 self.log.error("Bad Client Message: %s"%msg)
447 self.log.error("Bad Query Message: %s"%msg)
464 return
448 return
465 client_id = idents[0]
449 client_id = idents[0]
466 try:
450 try:
467 msg = self.session.unpack_message(msg, content=True)
451 msg = self.session.unpack_message(msg, content=True)
468 except:
452 except:
469 content = error.wrap_exception()
453 content = error.wrap_exception()
470 self.log.error("Bad Client Message: %s"%msg, exc_info=True)
454 self.log.error("Bad Query Message: %s"%msg, exc_info=True)
471 self.session.send(self.clientele, "hub_error", ident=client_id,
455 self.session.send(self.query, "hub_error", ident=client_id,
472 content=content)
456 content=content)
473 return
457 return
474
458
475 # print client_id, header, parent, content
459 # print client_id, header, parent, content
476 #switch on message type:
460 #switch on message type:
477 msg_type = msg['msg_type']
461 msg_type = msg['msg_type']
478 self.log.info("client:: client %s requested %s"%(client_id, msg_type))
462 self.log.info("client::client %s requested %s"%(client_id, msg_type))
479 handler = self.client_handlers.get(msg_type, None)
463 handler = self.query_handlers.get(msg_type, None)
480 try:
464 try:
481 assert handler is not None, "Bad Message Type: %s"%msg_type
465 assert handler is not None, "Bad Message Type: %s"%msg_type
482 except:
466 except:
483 content = error.wrap_exception()
467 content = error.wrap_exception()
484 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
468 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
485 self.session.send(self.clientele, "hub_error", ident=client_id,
469 self.session.send(self.query, "hub_error", ident=client_id,
486 content=content)
470 content=content)
487 return
471 return
488 else:
472 else:
489 handler(client_id, msg)
473 handler(idents, msg)
490
474
491 def dispatch_db(self, msg):
475 def dispatch_db(self, msg):
492 """"""
476 """"""
493 raise NotImplementedError
477 raise NotImplementedError
494
478
495 #---------------------------------------------------------------------------
479 #---------------------------------------------------------------------------
496 # handler methods (1 per event)
480 # handler methods (1 per event)
497 #---------------------------------------------------------------------------
481 #---------------------------------------------------------------------------
498
482
499 #----------------------- Heartbeat --------------------------------------
483 #----------------------- Heartbeat --------------------------------------
500
484
501 def handle_new_heart(self, heart):
485 def handle_new_heart(self, heart):
502 """handler to attach to heartbeater.
486 """handler to attach to heartbeater.
503 Called when a new heart starts to beat.
487 Called when a new heart starts to beat.
504 Triggers completion of registration."""
488 Triggers completion of registration."""
505 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
489 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
506 if heart not in self.incoming_registrations:
490 if heart not in self.incoming_registrations:
507 self.log.info("heartbeat::ignoring new heart: %r"%heart)
491 self.log.info("heartbeat::ignoring new heart: %r"%heart)
508 else:
492 else:
509 self.finish_registration(heart)
493 self.finish_registration(heart)
510
494
511
495
512 def handle_heart_failure(self, heart):
496 def handle_heart_failure(self, heart):
513 """handler to attach to heartbeater.
497 """handler to attach to heartbeater.
514 called when a previously registered heart fails to respond to beat request.
498 called when a previously registered heart fails to respond to beat request.
515 triggers unregistration"""
499 triggers unregistration"""
516 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
500 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
517 eid = self.hearts.get(heart, None)
501 eid = self.hearts.get(heart, None)
518 queue = self.engines[eid].queue
502 queue = self.engines[eid].queue
519 if eid is None:
503 if eid is None:
520 self.log.info("heartbeat::ignoring heart failure %r"%heart)
504 self.log.info("heartbeat::ignoring heart failure %r"%heart)
521 else:
505 else:
522 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
506 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
523
507
524 #----------------------- MUX Queue Traffic ------------------------------
508 #----------------------- MUX Queue Traffic ------------------------------
525
509
526 def save_queue_request(self, idents, msg):
510 def save_queue_request(self, idents, msg):
527 if len(idents) < 2:
511 if len(idents) < 2:
528 self.log.error("invalid identity prefix: %s"%idents)
512 self.log.error("invalid identity prefix: %s"%idents)
529 return
513 return
530 queue_id, client_id = idents[:2]
514 queue_id, client_id = idents[:2]
531 try:
515 try:
532 msg = self.session.unpack_message(msg, content=False)
516 msg = self.session.unpack_message(msg, content=False)
533 except:
517 except:
534 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
518 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
535 return
519 return
536
520
537 eid = self.by_ident.get(queue_id, None)
521 eid = self.by_ident.get(queue_id, None)
538 if eid is None:
522 if eid is None:
539 self.log.error("queue::target %r not registered"%queue_id)
523 self.log.error("queue::target %r not registered"%queue_id)
540 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
524 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
541 return
525 return
542
526
543 header = msg['header']
527 header = msg['header']
544 msg_id = header['msg_id']
528 msg_id = header['msg_id']
545 record = init_record(msg)
529 record = init_record(msg)
546 record['engine_uuid'] = queue_id
530 record['engine_uuid'] = queue_id
547 record['client_uuid'] = client_id
531 record['client_uuid'] = client_id
548 record['queue'] = 'mux'
532 record['queue'] = 'mux'
549
533
550 self.pending.add(msg_id)
534 self.pending.add(msg_id)
551 self.queues[eid].append(msg_id)
535 self.queues[eid].append(msg_id)
552 self.db.add_record(msg_id, record)
536 self.db.add_record(msg_id, record)
553
537
554 def save_queue_result(self, idents, msg):
538 def save_queue_result(self, idents, msg):
555 if len(idents) < 2:
539 if len(idents) < 2:
556 self.log.error("invalid identity prefix: %s"%idents)
540 self.log.error("invalid identity prefix: %s"%idents)
557 return
541 return
558
542
559 client_id, queue_id = idents[:2]
543 client_id, queue_id = idents[:2]
560 try:
544 try:
561 msg = self.session.unpack_message(msg, content=False)
545 msg = self.session.unpack_message(msg, content=False)
562 except:
546 except:
563 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
547 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
564 queue_id,client_id, msg), exc_info=True)
548 queue_id,client_id, msg), exc_info=True)
565 return
549 return
566
550
567 eid = self.by_ident.get(queue_id, None)
551 eid = self.by_ident.get(queue_id, None)
568 if eid is None:
552 if eid is None:
569 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
553 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
570 self.log.debug("queue:: %s"%msg[2:])
554 self.log.debug("queue:: %s"%msg[2:])
571 return
555 return
572
556
573 parent = msg['parent_header']
557 parent = msg['parent_header']
574 if not parent:
558 if not parent:
575 return
559 return
576 msg_id = parent['msg_id']
560 msg_id = parent['msg_id']
577 if msg_id in self.pending:
561 if msg_id in self.pending:
578 self.pending.remove(msg_id)
562 self.pending.remove(msg_id)
579 self.all_completed.add(msg_id)
563 self.all_completed.add(msg_id)
580 self.queues[eid].remove(msg_id)
564 self.queues[eid].remove(msg_id)
581 self.completed[eid].append(msg_id)
565 self.completed[eid].append(msg_id)
582 elif msg_id not in self.all_completed:
566 elif msg_id not in self.all_completed:
583 # it could be a result from a dead engine that died before delivering the
567 # it could be a result from a dead engine that died before delivering the
584 # result
568 # result
585 self.log.warn("queue:: unknown msg finished %s"%msg_id)
569 self.log.warn("queue:: unknown msg finished %s"%msg_id)
586 return
570 return
587 # update record anyway, because the unregistration could have been premature
571 # update record anyway, because the unregistration could have been premature
588 rheader = msg['header']
572 rheader = msg['header']
589 completed = datetime.strptime(rheader['date'], ISO8601)
573 completed = datetime.strptime(rheader['date'], ISO8601)
590 started = rheader.get('started', None)
574 started = rheader.get('started', None)
591 if started is not None:
575 if started is not None:
592 started = datetime.strptime(started, ISO8601)
576 started = datetime.strptime(started, ISO8601)
593 result = {
577 result = {
594 'result_header' : rheader,
578 'result_header' : rheader,
595 'result_content': msg['content'],
579 'result_content': msg['content'],
596 'started' : started,
580 'started' : started,
597 'completed' : completed
581 'completed' : completed
598 }
582 }
599
583
600 result['result_buffers'] = msg['buffers']
584 result['result_buffers'] = msg['buffers']
601 self.db.update_record(msg_id, result)
585 self.db.update_record(msg_id, result)
602
586
603
587
604 #--------------------- Task Queue Traffic ------------------------------
588 #--------------------- Task Queue Traffic ------------------------------
605
589
606 def save_task_request(self, idents, msg):
590 def save_task_request(self, idents, msg):
607 """Save the submission of a task."""
591 """Save the submission of a task."""
608 client_id = idents[0]
592 client_id = idents[0]
609
593
610 try:
594 try:
611 msg = self.session.unpack_message(msg, content=False)
595 msg = self.session.unpack_message(msg, content=False)
612 except:
596 except:
613 self.log.error("task::client %r sent invalid task message: %s"%(
597 self.log.error("task::client %r sent invalid task message: %s"%(
614 client_id, msg), exc_info=True)
598 client_id, msg), exc_info=True)
615 return
599 return
616 record = init_record(msg)
600 record = init_record(msg)
617
601
618 record['client_uuid'] = client_id
602 record['client_uuid'] = client_id
619 record['queue'] = 'task'
603 record['queue'] = 'task'
620 header = msg['header']
604 header = msg['header']
621 msg_id = header['msg_id']
605 msg_id = header['msg_id']
622 self.pending.add(msg_id)
606 self.pending.add(msg_id)
623 self.db.add_record(msg_id, record)
607 self.db.add_record(msg_id, record)
624
608
625 def save_task_result(self, idents, msg):
609 def save_task_result(self, idents, msg):
626 """save the result of a completed task."""
610 """save the result of a completed task."""
627 client_id = idents[0]
611 client_id = idents[0]
628 try:
612 try:
629 msg = self.session.unpack_message(msg, content=False)
613 msg = self.session.unpack_message(msg, content=False)
630 except:
614 except:
631 self.log.error("task::invalid task result message send to %r: %s"%(
615 self.log.error("task::invalid task result message send to %r: %s"%(
632 client_id, msg), exc_info=True)
616 client_id, msg), exc_info=True)
633 raise
617 raise
634 return
618 return
635
619
636 parent = msg['parent_header']
620 parent = msg['parent_header']
637 if not parent:
621 if not parent:
638 # print msg
622 # print msg
639 self.log.warn("Task %r had no parent!"%msg)
623 self.log.warn("Task %r had no parent!"%msg)
640 return
624 return
641 msg_id = parent['msg_id']
625 msg_id = parent['msg_id']
642
626
643 header = msg['header']
627 header = msg['header']
644 engine_uuid = header.get('engine', None)
628 engine_uuid = header.get('engine', None)
645 eid = self.by_ident.get(engine_uuid, None)
629 eid = self.by_ident.get(engine_uuid, None)
646
630
647 if msg_id in self.pending:
631 if msg_id in self.pending:
648 self.pending.remove(msg_id)
632 self.pending.remove(msg_id)
649 self.all_completed.add(msg_id)
633 self.all_completed.add(msg_id)
650 if eid is not None:
634 if eid is not None:
651 self.completed[eid].append(msg_id)
635 self.completed[eid].append(msg_id)
652 if msg_id in self.tasks[eid]:
636 if msg_id in self.tasks[eid]:
653 self.tasks[eid].remove(msg_id)
637 self.tasks[eid].remove(msg_id)
654 completed = datetime.strptime(header['date'], ISO8601)
638 completed = datetime.strptime(header['date'], ISO8601)
655 started = header.get('started', None)
639 started = header.get('started', None)
656 if started is not None:
640 if started is not None:
657 started = datetime.strptime(started, ISO8601)
641 started = datetime.strptime(started, ISO8601)
658 result = {
642 result = {
659 'result_header' : header,
643 'result_header' : header,
660 'result_content': msg['content'],
644 'result_content': msg['content'],
661 'started' : started,
645 'started' : started,
662 'completed' : completed,
646 'completed' : completed,
663 'engine_uuid': engine_uuid
647 'engine_uuid': engine_uuid
664 }
648 }
665
649
666 result['result_buffers'] = msg['buffers']
650 result['result_buffers'] = msg['buffers']
667 self.db.update_record(msg_id, result)
651 self.db.update_record(msg_id, result)
668
652
669 else:
653 else:
670 self.log.debug("task::unknown task %s finished"%msg_id)
654 self.log.debug("task::unknown task %s finished"%msg_id)
671
655
672 def save_task_destination(self, idents, msg):
656 def save_task_destination(self, idents, msg):
673 try:
657 try:
674 msg = self.session.unpack_message(msg, content=True)
658 msg = self.session.unpack_message(msg, content=True)
675 except:
659 except:
676 self.log.error("task::invalid task tracking message", exc_info=True)
660 self.log.error("task::invalid task tracking message", exc_info=True)
677 return
661 return
678 content = msg['content']
662 content = msg['content']
679 # print (content)
663 # print (content)
680 msg_id = content['msg_id']
664 msg_id = content['msg_id']
681 engine_uuid = content['engine_id']
665 engine_uuid = content['engine_id']
682 eid = self.by_ident[engine_uuid]
666 eid = self.by_ident[engine_uuid]
683
667
684 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
668 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
685 # if msg_id in self.mia:
669 # if msg_id in self.mia:
686 # self.mia.remove(msg_id)
670 # self.mia.remove(msg_id)
687 # else:
671 # else:
688 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
672 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
689
673
690 self.tasks[eid].append(msg_id)
674 self.tasks[eid].append(msg_id)
691 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
675 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
692 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
676 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
693
677
694 def mia_task_request(self, idents, msg):
678 def mia_task_request(self, idents, msg):
695 raise NotImplementedError
679 raise NotImplementedError
696 client_id = idents[0]
680 client_id = idents[0]
697 # content = dict(mia=self.mia,status='ok')
681 # content = dict(mia=self.mia,status='ok')
698 # self.session.send('mia_reply', content=content, idents=client_id)
682 # self.session.send('mia_reply', content=content, idents=client_id)
699
683
700
684
701 #--------------------- IOPub Traffic ------------------------------
685 #--------------------- IOPub Traffic ------------------------------
702
686
703 def save_iopub_message(self, topics, msg):
687 def save_iopub_message(self, topics, msg):
704 """save an iopub message into the db"""
688 """save an iopub message into the db"""
705 # print (topics)
689 # print (topics)
706 try:
690 try:
707 msg = self.session.unpack_message(msg, content=True)
691 msg = self.session.unpack_message(msg, content=True)
708 except:
692 except:
709 self.log.error("iopub::invalid IOPub message", exc_info=True)
693 self.log.error("iopub::invalid IOPub message", exc_info=True)
710 return
694 return
711
695
712 parent = msg['parent_header']
696 parent = msg['parent_header']
713 if not parent:
697 if not parent:
714 self.log.error("iopub::invalid IOPub message: %s"%msg)
698 self.log.error("iopub::invalid IOPub message: %s"%msg)
715 return
699 return
716 msg_id = parent['msg_id']
700 msg_id = parent['msg_id']
717 msg_type = msg['msg_type']
701 msg_type = msg['msg_type']
718 content = msg['content']
702 content = msg['content']
719
703
720 # ensure msg_id is in db
704 # ensure msg_id is in db
721 try:
705 try:
722 rec = self.db.get_record(msg_id)
706 rec = self.db.get_record(msg_id)
723 except:
707 except:
724 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
708 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
725 return
709 return
726 # stream
710 # stream
727 d = {}
711 d = {}
728 if msg_type == 'stream':
712 if msg_type == 'stream':
729 name = content['name']
713 name = content['name']
730 s = rec[name] or ''
714 s = rec[name] or ''
731 d[name] = s + content['data']
715 d[name] = s + content['data']
732
716
733 elif msg_type == 'pyerr':
717 elif msg_type == 'pyerr':
734 d['pyerr'] = content
718 d['pyerr'] = content
735 else:
719 else:
736 d[msg_type] = content['data']
720 d[msg_type] = content['data']
737
721
738 self.db.update_record(msg_id, d)
722 self.db.update_record(msg_id, d)
739
723
740
724
741
725
742 #-------------------------------------------------------------------------
726 #-------------------------------------------------------------------------
743 # Registration requests
727 # Registration requests
744 #-------------------------------------------------------------------------
728 #-------------------------------------------------------------------------
745
729
746 def connection_request(self, client_id, msg):
730 def connection_request(self, client_id, msg):
747 """Reply with connection addresses for clients."""
731 """Reply with connection addresses for clients."""
748 self.log.info("client::client %s connected"%client_id)
732 self.log.info("client::client %s connected"%client_id)
749 content = dict(status='ok')
733 content = dict(status='ok')
750 content.update(self.client_info)
734 content.update(self.client_info)
751 jsonable = {}
735 jsonable = {}
752 for k,v in self.keytable.iteritems():
736 for k,v in self.keytable.iteritems():
753 jsonable[str(k)] = v
737 jsonable[str(k)] = v
754 content['engines'] = jsonable
738 content['engines'] = jsonable
755 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
739 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
756
740
757 def register_engine(self, reg, msg):
741 def register_engine(self, reg, msg):
758 """Register a new engine."""
742 """Register a new engine."""
759 content = msg['content']
743 content = msg['content']
760 try:
744 try:
761 queue = content['queue']
745 queue = content['queue']
762 except KeyError:
746 except KeyError:
763 self.log.error("registration::queue not specified", exc_info=True)
747 self.log.error("registration::queue not specified", exc_info=True)
764 return
748 return
765 heart = content.get('heartbeat', None)
749 heart = content.get('heartbeat', None)
766 """register a new engine, and create the socket(s) necessary"""
750 """register a new engine, and create the socket(s) necessary"""
767 eid = self._next_id
751 eid = self._next_id
768 # print (eid, queue, reg, heart)
752 # print (eid, queue, reg, heart)
769
753
770 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
754 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
771
755
772 content = dict(id=eid,status='ok')
756 content = dict(id=eid,status='ok')
773 content.update(self.engine_info)
757 content.update(self.engine_info)
774 # check if requesting available IDs:
758 # check if requesting available IDs:
775 if queue in self.by_ident:
759 if queue in self.by_ident:
776 try:
760 try:
777 raise KeyError("queue_id %r in use"%queue)
761 raise KeyError("queue_id %r in use"%queue)
778 except:
762 except:
779 content = error.wrap_exception()
763 content = error.wrap_exception()
780 self.log.error("queue_id %r in use"%queue, exc_info=True)
764 self.log.error("queue_id %r in use"%queue, exc_info=True)
781 elif heart in self.hearts: # need to check unique hearts?
765 elif heart in self.hearts: # need to check unique hearts?
782 try:
766 try:
783 raise KeyError("heart_id %r in use"%heart)
767 raise KeyError("heart_id %r in use"%heart)
784 except:
768 except:
785 self.log.error("heart_id %r in use"%heart, exc_info=True)
769 self.log.error("heart_id %r in use"%heart, exc_info=True)
786 content = error.wrap_exception()
770 content = error.wrap_exception()
787 else:
771 else:
788 for h, pack in self.incoming_registrations.iteritems():
772 for h, pack in self.incoming_registrations.iteritems():
789 if heart == h:
773 if heart == h:
790 try:
774 try:
791 raise KeyError("heart_id %r in use"%heart)
775 raise KeyError("heart_id %r in use"%heart)
792 except:
776 except:
793 self.log.error("heart_id %r in use"%heart, exc_info=True)
777 self.log.error("heart_id %r in use"%heart, exc_info=True)
794 content = error.wrap_exception()
778 content = error.wrap_exception()
795 break
779 break
796 elif queue == pack[1]:
780 elif queue == pack[1]:
797 try:
781 try:
798 raise KeyError("queue_id %r in use"%queue)
782 raise KeyError("queue_id %r in use"%queue)
799 except:
783 except:
800 self.log.error("queue_id %r in use"%queue, exc_info=True)
784 self.log.error("queue_id %r in use"%queue, exc_info=True)
801 content = error.wrap_exception()
785 content = error.wrap_exception()
802 break
786 break
803
787
804 msg = self.session.send(self.registrar, "registration_reply",
788 msg = self.session.send(self.query, "registration_reply",
805 content=content,
789 content=content,
806 ident=reg)
790 ident=reg)
807
791
808 if content['status'] == 'ok':
792 if content['status'] == 'ok':
809 if heart in self.heartmonitor.hearts:
793 if heart in self.heartmonitor.hearts:
810 # already beating
794 # already beating
811 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
795 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
812 self.finish_registration(heart)
796 self.finish_registration(heart)
813 else:
797 else:
814 purge = lambda : self._purge_stalled_registration(heart)
798 purge = lambda : self._purge_stalled_registration(heart)
815 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
799 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
816 dc.start()
800 dc.start()
817 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
801 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
818 else:
802 else:
819 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
803 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
820 return eid
804 return eid
821
805
822 def unregister_engine(self, ident, msg):
806 def unregister_engine(self, ident, msg):
823 """Unregister an engine that explicitly requested to leave."""
807 """Unregister an engine that explicitly requested to leave."""
824 try:
808 try:
825 eid = msg['content']['id']
809 eid = msg['content']['id']
826 except:
810 except:
827 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
811 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
828 return
812 return
829 self.log.info("registration::unregister_engine(%s)"%eid)
813 self.log.info("registration::unregister_engine(%s)"%eid)
830 # print (eid)
814 # print (eid)
831 content=dict(id=eid, queue=self.engines[eid].queue)
815 content=dict(id=eid, queue=self.engines[eid].queue)
832 self.ids.remove(eid)
816 self.ids.remove(eid)
833 uuid = self.keytable.pop(eid)
817 uuid = self.keytable.pop(eid)
834 ec = self.engines.pop(eid)
818 ec = self.engines.pop(eid)
835 self.hearts.pop(ec.heartbeat)
819 self.hearts.pop(ec.heartbeat)
836 self.by_ident.pop(ec.queue)
820 self.by_ident.pop(ec.queue)
837 self.completed.pop(eid)
821 self.completed.pop(eid)
838 self._handle_stranded_msgs(eid, uuid)
822 self._handle_stranded_msgs(eid, uuid)
839 ############## TODO: HANDLE IT ################
823 ############## TODO: HANDLE IT ################
840
824
841 if self.notifier:
825 if self.notifier:
842 self.session.send(self.notifier, "unregistration_notification", content=content)
826 self.session.send(self.notifier, "unregistration_notification", content=content)
843
827
844 def _handle_stranded_msgs(self, eid, uuid):
828 def _handle_stranded_msgs(self, eid, uuid):
845 """Handle messages known to be on an engine when the engine unregisters.
829 """Handle messages known to be on an engine when the engine unregisters.
846
830
847 It is possible that this will fire prematurely - that is, an engine will
831 It is possible that this will fire prematurely - that is, an engine will
848 go down after completing a result, and the client will be notified
832 go down after completing a result, and the client will be notified
849 that the result failed and later receive the actual result.
833 that the result failed and later receive the actual result.
850 """
834 """
851
835
852 outstanding = self.queues.pop(eid)
836 outstanding = self.queues.pop(eid)
853
837
854 for msg_id in outstanding:
838 for msg_id in outstanding:
855 self.pending.remove(msg_id)
839 self.pending.remove(msg_id)
856 self.all_completed.add(msg_id)
840 self.all_completed.add(msg_id)
857 try:
841 try:
858 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
842 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
859 except:
843 except:
860 content = error.wrap_exception()
844 content = error.wrap_exception()
861 # build a fake header:
845 # build a fake header:
862 header = {}
846 header = {}
863 header['engine'] = uuid
847 header['engine'] = uuid
864 header['date'] = datetime.now().strftime(ISO8601)
848 header['date'] = datetime.now().strftime(ISO8601)
865 rec = dict(result_content=content, result_header=header, result_buffers=[])
849 rec = dict(result_content=content, result_header=header, result_buffers=[])
866 rec['completed'] = header['date']
850 rec['completed'] = header['date']
867 rec['engine_uuid'] = uuid
851 rec['engine_uuid'] = uuid
868 self.db.update_record(msg_id, rec)
852 self.db.update_record(msg_id, rec)
869
853
870 def finish_registration(self, heart):
854 def finish_registration(self, heart):
871 """Second half of engine registration, called after our HeartMonitor
855 """Second half of engine registration, called after our HeartMonitor
872 has received a beat from the Engine's Heart."""
856 has received a beat from the Engine's Heart."""
873 try:
857 try:
874 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
858 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
875 except KeyError:
859 except KeyError:
876 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
860 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
877 return
861 return
878 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
862 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
879 if purge is not None:
863 if purge is not None:
880 purge.stop()
864 purge.stop()
881 control = queue
865 control = queue
882 self.ids.add(eid)
866 self.ids.add(eid)
883 self.keytable[eid] = queue
867 self.keytable[eid] = queue
884 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
868 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
885 control=control, heartbeat=heart)
869 control=control, heartbeat=heart)
886 self.by_ident[queue] = eid
870 self.by_ident[queue] = eid
887 self.queues[eid] = list()
871 self.queues[eid] = list()
888 self.tasks[eid] = list()
872 self.tasks[eid] = list()
889 self.completed[eid] = list()
873 self.completed[eid] = list()
890 self.hearts[heart] = eid
874 self.hearts[heart] = eid
891 content = dict(id=eid, queue=self.engines[eid].queue)
875 content = dict(id=eid, queue=self.engines[eid].queue)
892 if self.notifier:
876 if self.notifier:
893 self.session.send(self.notifier, "registration_notification", content=content)
877 self.session.send(self.notifier, "registration_notification", content=content)
894 self.log.info("engine::Engine Connected: %i"%eid)
878 self.log.info("engine::Engine Connected: %i"%eid)
895
879
896 def _purge_stalled_registration(self, heart):
880 def _purge_stalled_registration(self, heart):
897 if heart in self.incoming_registrations:
881 if heart in self.incoming_registrations:
898 eid = self.incoming_registrations.pop(heart)[0]
882 eid = self.incoming_registrations.pop(heart)[0]
899 self.log.info("registration::purging stalled registration: %i"%eid)
883 self.log.info("registration::purging stalled registration: %i"%eid)
900 else:
884 else:
901 pass
885 pass
902
886
903 #-------------------------------------------------------------------------
887 #-------------------------------------------------------------------------
904 # Client Requests
888 # Client Requests
905 #-------------------------------------------------------------------------
889 #-------------------------------------------------------------------------
906
890
907 def shutdown_request(self, client_id, msg):
891 def shutdown_request(self, client_id, msg):
908 """handle shutdown request."""
892 """handle shutdown request."""
909 # s = self.context.socket(zmq.XREQ)
893 # s = self.context.socket(zmq.XREQ)
910 # s.connect(self.client_connections['mux'])
894 # s.connect(self.client_connections['mux'])
911 # time.sleep(0.1)
895 # time.sleep(0.1)
912 # for eid,ec in self.engines.iteritems():
896 # for eid,ec in self.engines.iteritems():
913 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
897 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
914 # time.sleep(1)
898 # time.sleep(1)
915 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
899 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
916 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
900 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
917 dc.start()
901 dc.start()
918
902
919 def _shutdown(self):
903 def _shutdown(self):
920 self.log.info("hub::hub shutting down.")
904 self.log.info("hub::hub shutting down.")
921 time.sleep(0.1)
905 time.sleep(0.1)
922 sys.exit(0)
906 sys.exit(0)
923
907
924
908
925 def check_load(self, client_id, msg):
909 def check_load(self, client_id, msg):
926 content = msg['content']
910 content = msg['content']
927 try:
911 try:
928 targets = content['targets']
912 targets = content['targets']
929 targets = self._validate_targets(targets)
913 targets = self._validate_targets(targets)
930 except:
914 except:
931 content = error.wrap_exception()
915 content = error.wrap_exception()
932 self.session.send(self.clientele, "hub_error",
916 self.session.send(self.query, "hub_error",
933 content=content, ident=client_id)
917 content=content, ident=client_id)
934 return
918 return
935
919
936 content = dict(status='ok')
920 content = dict(status='ok')
937 # loads = {}
921 # loads = {}
938 for t in targets:
922 for t in targets:
939 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
923 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
940 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
924 self.session.send(self.query, "load_reply", content=content, ident=client_id)
941
925
942
926
943 def queue_status(self, client_id, msg):
927 def queue_status(self, client_id, msg):
944 """Return the Queue status of one or more targets.
928 """Return the Queue status of one or more targets.
945 if verbose: return the msg_ids
929 if verbose: return the msg_ids
946 else: return len of each type.
930 else: return len of each type.
947 keys: queue (pending MUX jobs)
931 keys: queue (pending MUX jobs)
948 tasks (pending Task jobs)
932 tasks (pending Task jobs)
949 completed (finished jobs from both queues)"""
933 completed (finished jobs from both queues)"""
950 content = msg['content']
934 content = msg['content']
951 targets = content['targets']
935 targets = content['targets']
952 try:
936 try:
953 targets = self._validate_targets(targets)
937 targets = self._validate_targets(targets)
954 except:
938 except:
955 content = error.wrap_exception()
939 content = error.wrap_exception()
956 self.session.send(self.clientele, "hub_error",
940 self.session.send(self.query, "hub_error",
957 content=content, ident=client_id)
941 content=content, ident=client_id)
958 return
942 return
959 verbose = content.get('verbose', False)
943 verbose = content.get('verbose', False)
960 content = dict(status='ok')
944 content = dict(status='ok')
961 for t in targets:
945 for t in targets:
962 queue = self.queues[t]
946 queue = self.queues[t]
963 completed = self.completed[t]
947 completed = self.completed[t]
964 tasks = self.tasks[t]
948 tasks = self.tasks[t]
965 if not verbose:
949 if not verbose:
966 queue = len(queue)
950 queue = len(queue)
967 completed = len(completed)
951 completed = len(completed)
968 tasks = len(tasks)
952 tasks = len(tasks)
969 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
953 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
970 # pending
954 # pending
971 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
955 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
972
956
973 def purge_results(self, client_id, msg):
957 def purge_results(self, client_id, msg):
974 """Purge results from memory. This method is more valuable before we move
958 """Purge results from memory. This method is more valuable before we move
975 to a DB based message storage mechanism."""
959 to a DB based message storage mechanism."""
976 content = msg['content']
960 content = msg['content']
977 msg_ids = content.get('msg_ids', [])
961 msg_ids = content.get('msg_ids', [])
978 reply = dict(status='ok')
962 reply = dict(status='ok')
979 if msg_ids == 'all':
963 if msg_ids == 'all':
980 self.db.drop_matching_records(dict(completed={'$ne':None}))
964 self.db.drop_matching_records(dict(completed={'$ne':None}))
981 else:
965 else:
982 for msg_id in msg_ids:
966 for msg_id in msg_ids:
983 if msg_id in self.all_completed:
967 if msg_id in self.all_completed:
984 self.db.drop_record(msg_id)
968 self.db.drop_record(msg_id)
985 else:
969 else:
986 if msg_id in self.pending:
970 if msg_id in self.pending:
987 try:
971 try:
988 raise IndexError("msg pending: %r"%msg_id)
972 raise IndexError("msg pending: %r"%msg_id)
989 except:
973 except:
990 reply = error.wrap_exception()
974 reply = error.wrap_exception()
991 else:
975 else:
992 try:
976 try:
993 raise IndexError("No such msg: %r"%msg_id)
977 raise IndexError("No such msg: %r"%msg_id)
994 except:
978 except:
995 reply = error.wrap_exception()
979 reply = error.wrap_exception()
996 break
980 break
997 eids = content.get('engine_ids', [])
981 eids = content.get('engine_ids', [])
998 for eid in eids:
982 for eid in eids:
999 if eid not in self.engines:
983 if eid not in self.engines:
1000 try:
984 try:
1001 raise IndexError("No such engine: %i"%eid)
985 raise IndexError("No such engine: %i"%eid)
1002 except:
986 except:
1003 reply = error.wrap_exception()
987 reply = error.wrap_exception()
1004 break
988 break
1005 msg_ids = self.completed.pop(eid)
989 msg_ids = self.completed.pop(eid)
1006 uid = self.engines[eid].queue
990 uid = self.engines[eid].queue
1007 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
991 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1008
992
1009 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
993 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1010
994
1011 def resubmit_task(self, client_id, msg, buffers):
995 def resubmit_task(self, client_id, msg, buffers):
1012 """Resubmit a task."""
996 """Resubmit a task."""
1013 raise NotImplementedError
997 raise NotImplementedError
1014
998
1015 def get_results(self, client_id, msg):
999 def get_results(self, client_id, msg):
1016 """Get the result of 1 or more messages."""
1000 """Get the result of 1 or more messages."""
1017 content = msg['content']
1001 content = msg['content']
1018 msg_ids = sorted(set(content['msg_ids']))
1002 msg_ids = sorted(set(content['msg_ids']))
1019 statusonly = content.get('status_only', False)
1003 statusonly = content.get('status_only', False)
1020 pending = []
1004 pending = []
1021 completed = []
1005 completed = []
1022 content = dict(status='ok')
1006 content = dict(status='ok')
1023 content['pending'] = pending
1007 content['pending'] = pending
1024 content['completed'] = completed
1008 content['completed'] = completed
1025 buffers = []
1009 buffers = []
1026 if not statusonly:
1010 if not statusonly:
1027 content['results'] = {}
1011 content['results'] = {}
1028 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1012 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1029 for msg_id in msg_ids:
1013 for msg_id in msg_ids:
1030 if msg_id in self.pending:
1014 if msg_id in self.pending:
1031 pending.append(msg_id)
1015 pending.append(msg_id)
1032 elif msg_id in self.all_completed:
1016 elif msg_id in self.all_completed:
1033 completed.append(msg_id)
1017 completed.append(msg_id)
1034 if not statusonly:
1018 if not statusonly:
1035 rec = records[msg_id]
1019 rec = records[msg_id]
1036 io_dict = {}
1020 io_dict = {}
1037 for key in 'pyin pyout pyerr stdout stderr'.split():
1021 for key in 'pyin pyout pyerr stdout stderr'.split():
1038 io_dict[key] = rec[key]
1022 io_dict[key] = rec[key]
1039 content[msg_id] = { 'result_content': rec['result_content'],
1023 content[msg_id] = { 'result_content': rec['result_content'],
1040 'header': rec['header'],
1024 'header': rec['header'],
1041 'result_header' : rec['result_header'],
1025 'result_header' : rec['result_header'],
1042 'io' : io_dict,
1026 'io' : io_dict,
1043 }
1027 }
1044 if rec['result_buffers']:
1028 if rec['result_buffers']:
1045 buffers.extend(map(str, rec['result_buffers']))
1029 buffers.extend(map(str, rec['result_buffers']))
1046 else:
1030 else:
1047 try:
1031 try:
1048 raise KeyError('No such message: '+msg_id)
1032 raise KeyError('No such message: '+msg_id)
1049 except:
1033 except:
1050 content = error.wrap_exception()
1034 content = error.wrap_exception()
1051 break
1035 break
1052 self.session.send(self.clientele, "result_reply", content=content,
1036 self.session.send(self.query, "result_reply", content=content,
1053 parent=msg, ident=client_id,
1037 parent=msg, ident=client_id,
1054 buffers=buffers)
1038 buffers=buffers)
1055
1039
General Comments 0
You need to be logged in to leave comments. Login now