##// END OF EJS Templates
handle datetime objects in Session...
MinRK -
Show More
@@ -1,1353 +1,1351 b''
1 """A semi-synchronous Client for the ZMQ cluster"""
1 """A semi-synchronous Client for the ZMQ cluster"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import json
14 import json
15 import time
15 import time
16 import warnings
16 import warnings
17 from datetime import datetime
17 from datetime import datetime
18 from getpass import getpass
18 from getpass import getpass
19 from pprint import pprint
19 from pprint import pprint
20
20
21 pjoin = os.path.join
21 pjoin = os.path.join
22
22
23 import zmq
23 import zmq
24 # from zmq.eventloop import ioloop, zmqstream
24 # from zmq.eventloop import ioloop, zmqstream
25
25
26 from IPython.utils.jsonutil import extract_dates
27 from IPython.utils.path import get_ipython_dir
26 from IPython.utils.path import get_ipython_dir
28 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
27 from IPython.utils.traitlets import (HasTraits, Int, Instance, Unicode,
29 Dict, List, Bool, Set)
28 Dict, List, Bool, Set)
30 from IPython.external.decorator import decorator
29 from IPython.external.decorator import decorator
31 from IPython.external.ssh import tunnel
30 from IPython.external.ssh import tunnel
32
31
33 from IPython.parallel import error
32 from IPython.parallel import error
34 from IPython.parallel import util
33 from IPython.parallel import util
35
34
36 from IPython.zmq.session import Session, Message
35 from IPython.zmq.session import Session, Message
37
36
38 from .asyncresult import AsyncResult, AsyncHubResult
37 from .asyncresult import AsyncResult, AsyncHubResult
39 from IPython.core.newapplication import ProfileDir, ProfileDirError
38 from IPython.core.newapplication import ProfileDir, ProfileDirError
40 from .view import DirectView, LoadBalancedView
39 from .view import DirectView, LoadBalancedView
41
40
42 #--------------------------------------------------------------------------
41 #--------------------------------------------------------------------------
43 # Decorators for Client methods
42 # Decorators for Client methods
44 #--------------------------------------------------------------------------
43 #--------------------------------------------------------------------------
45
44
46 @decorator
45 @decorator
47 def spin_first(f, self, *args, **kwargs):
46 def spin_first(f, self, *args, **kwargs):
48 """Call spin() to sync state prior to calling the method."""
47 """Call spin() to sync state prior to calling the method."""
49 self.spin()
48 self.spin()
50 return f(self, *args, **kwargs)
49 return f(self, *args, **kwargs)
51
50
52
51
53 #--------------------------------------------------------------------------
52 #--------------------------------------------------------------------------
54 # Classes
53 # Classes
55 #--------------------------------------------------------------------------
54 #--------------------------------------------------------------------------
56
55
57 class Metadata(dict):
56 class Metadata(dict):
58 """Subclass of dict for initializing metadata values.
57 """Subclass of dict for initializing metadata values.
59
58
60 Attribute access works on keys.
59 Attribute access works on keys.
61
60
62 These objects have a strict set of keys - errors will raise if you try
61 These objects have a strict set of keys - errors will raise if you try
63 to add new keys.
62 to add new keys.
64 """
63 """
65 def __init__(self, *args, **kwargs):
64 def __init__(self, *args, **kwargs):
66 dict.__init__(self)
65 dict.__init__(self)
67 md = {'msg_id' : None,
66 md = {'msg_id' : None,
68 'submitted' : None,
67 'submitted' : None,
69 'started' : None,
68 'started' : None,
70 'completed' : None,
69 'completed' : None,
71 'received' : None,
70 'received' : None,
72 'engine_uuid' : None,
71 'engine_uuid' : None,
73 'engine_id' : None,
72 'engine_id' : None,
74 'follow' : None,
73 'follow' : None,
75 'after' : None,
74 'after' : None,
76 'status' : None,
75 'status' : None,
77
76
78 'pyin' : None,
77 'pyin' : None,
79 'pyout' : None,
78 'pyout' : None,
80 'pyerr' : None,
79 'pyerr' : None,
81 'stdout' : '',
80 'stdout' : '',
82 'stderr' : '',
81 'stderr' : '',
83 }
82 }
84 self.update(md)
83 self.update(md)
85 self.update(dict(*args, **kwargs))
84 self.update(dict(*args, **kwargs))
86
85
87 def __getattr__(self, key):
86 def __getattr__(self, key):
88 """getattr aliased to getitem"""
87 """getattr aliased to getitem"""
89 if key in self.iterkeys():
88 if key in self.iterkeys():
90 return self[key]
89 return self[key]
91 else:
90 else:
92 raise AttributeError(key)
91 raise AttributeError(key)
93
92
94 def __setattr__(self, key, value):
93 def __setattr__(self, key, value):
95 """setattr aliased to setitem, with strict"""
94 """setattr aliased to setitem, with strict"""
96 if key in self.iterkeys():
95 if key in self.iterkeys():
97 self[key] = value
96 self[key] = value
98 else:
97 else:
99 raise AttributeError(key)
98 raise AttributeError(key)
100
99
101 def __setitem__(self, key, value):
100 def __setitem__(self, key, value):
102 """strict static key enforcement"""
101 """strict static key enforcement"""
103 if key in self.iterkeys():
102 if key in self.iterkeys():
104 dict.__setitem__(self, key, value)
103 dict.__setitem__(self, key, value)
105 else:
104 else:
106 raise KeyError(key)
105 raise KeyError(key)
107
106
108
107
109 class Client(HasTraits):
108 class Client(HasTraits):
110 """A semi-synchronous client to the IPython ZMQ cluster
109 """A semi-synchronous client to the IPython ZMQ cluster
111
110
112 Parameters
111 Parameters
113 ----------
112 ----------
114
113
115 url_or_file : bytes; zmq url or path to ipcontroller-client.json
114 url_or_file : bytes; zmq url or path to ipcontroller-client.json
116 Connection information for the Hub's registration. If a json connector
115 Connection information for the Hub's registration. If a json connector
117 file is given, then likely no further configuration is necessary.
116 file is given, then likely no further configuration is necessary.
118 [Default: use profile]
117 [Default: use profile]
119 profile : bytes
118 profile : bytes
120 The name of the Cluster profile to be used to find connector information.
119 The name of the Cluster profile to be used to find connector information.
121 [Default: 'default']
120 [Default: 'default']
122 context : zmq.Context
121 context : zmq.Context
123 Pass an existing zmq.Context instance, otherwise the client will create its own.
122 Pass an existing zmq.Context instance, otherwise the client will create its own.
124 username : bytes
123 username : bytes
125 set username to be passed to the Session object
124 set username to be passed to the Session object
126 debug : bool
125 debug : bool
127 flag for lots of message printing for debug purposes
126 flag for lots of message printing for debug purposes
128
127
129 #-------------- ssh related args ----------------
128 #-------------- ssh related args ----------------
130 # These are args for configuring the ssh tunnel to be used
129 # These are args for configuring the ssh tunnel to be used
131 # credentials are used to forward connections over ssh to the Controller
130 # credentials are used to forward connections over ssh to the Controller
132 # Note that the ip given in `addr` needs to be relative to sshserver
131 # Note that the ip given in `addr` needs to be relative to sshserver
133 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
132 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
134 # and set sshserver as the same machine the Controller is on. However,
133 # and set sshserver as the same machine the Controller is on. However,
135 # the only requirement is that sshserver is able to see the Controller
134 # the only requirement is that sshserver is able to see the Controller
136 # (i.e. is within the same trusted network).
135 # (i.e. is within the same trusted network).
137
136
138 sshserver : str
137 sshserver : str
139 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
138 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
140 If keyfile or password is specified, and this is not, it will default to
139 If keyfile or password is specified, and this is not, it will default to
141 the ip given in addr.
140 the ip given in addr.
142 sshkey : str; path to public ssh key file
141 sshkey : str; path to public ssh key file
143 This specifies a key to be used in ssh login, default None.
142 This specifies a key to be used in ssh login, default None.
144 Regular default ssh keys will be used without specifying this argument.
143 Regular default ssh keys will be used without specifying this argument.
145 password : str
144 password : str
146 Your ssh password to sshserver. Note that if this is left None,
145 Your ssh password to sshserver. Note that if this is left None,
147 you will be prompted for it if passwordless key based login is unavailable.
146 you will be prompted for it if passwordless key based login is unavailable.
148 paramiko : bool
147 paramiko : bool
149 flag for whether to use paramiko instead of shell ssh for tunneling.
148 flag for whether to use paramiko instead of shell ssh for tunneling.
150 [default: True on win32, False else]
149 [default: True on win32, False else]
151
150
152 ------- exec authentication args -------
151 ------- exec authentication args -------
153 If even localhost is untrusted, you can have some protection against
152 If even localhost is untrusted, you can have some protection against
154 unauthorized execution by using a key. Messages are still sent
153 unauthorized execution by using a key. Messages are still sent
155 as cleartext, so if someone can snoop your loopback traffic this will
154 as cleartext, so if someone can snoop your loopback traffic this will
156 not help against malicious attacks.
155 not help against malicious attacks.
157
156
158 exec_key : str
157 exec_key : str
159 an authentication key or file containing a key
158 an authentication key or file containing a key
160 default: None
159 default: None
161
160
162
161
163 Attributes
162 Attributes
164 ----------
163 ----------
165
164
166 ids : list of int engine IDs
165 ids : list of int engine IDs
167 requesting the ids attribute always synchronizes
166 requesting the ids attribute always synchronizes
168 the registration state. To request ids without synchronization,
167 the registration state. To request ids without synchronization,
169 use semi-private _ids attributes.
168 use semi-private _ids attributes.
170
169
171 history : list of msg_ids
170 history : list of msg_ids
172 a list of msg_ids, keeping track of all the execution
171 a list of msg_ids, keeping track of all the execution
173 messages you have submitted in order.
172 messages you have submitted in order.
174
173
175 outstanding : set of msg_ids
174 outstanding : set of msg_ids
176 a set of msg_ids that have been submitted, but whose
175 a set of msg_ids that have been submitted, but whose
177 results have not yet been received.
176 results have not yet been received.
178
177
179 results : dict
178 results : dict
180 a dict of all our results, keyed by msg_id
179 a dict of all our results, keyed by msg_id
181
180
182 block : bool
181 block : bool
183 determines default behavior when block not specified
182 determines default behavior when block not specified
184 in execution methods
183 in execution methods
185
184
186 Methods
185 Methods
187 -------
186 -------
188
187
189 spin
188 spin
190 flushes incoming results and registration state changes
189 flushes incoming results and registration state changes
191 control methods spin, and requesting `ids` also ensures up to date
190 control methods spin, and requesting `ids` also ensures up to date
192
191
193 wait
192 wait
194 wait on one or more msg_ids
193 wait on one or more msg_ids
195
194
196 execution methods
195 execution methods
197 apply
196 apply
198 legacy: execute, run
197 legacy: execute, run
199
198
200 data movement
199 data movement
201 push, pull, scatter, gather
200 push, pull, scatter, gather
202
201
203 query methods
202 query methods
204 queue_status, get_result, purge, result_status
203 queue_status, get_result, purge, result_status
205
204
206 control methods
205 control methods
207 abort, shutdown
206 abort, shutdown
208
207
209 """
208 """
210
209
211
210
212 block = Bool(False)
211 block = Bool(False)
213 outstanding = Set()
212 outstanding = Set()
214 results = Instance('collections.defaultdict', (dict,))
213 results = Instance('collections.defaultdict', (dict,))
215 metadata = Instance('collections.defaultdict', (Metadata,))
214 metadata = Instance('collections.defaultdict', (Metadata,))
216 history = List()
215 history = List()
217 debug = Bool(False)
216 debug = Bool(False)
218 profile=Unicode('default')
217 profile=Unicode('default')
219
218
220 _outstanding_dict = Instance('collections.defaultdict', (set,))
219 _outstanding_dict = Instance('collections.defaultdict', (set,))
221 _ids = List()
220 _ids = List()
222 _connected=Bool(False)
221 _connected=Bool(False)
223 _ssh=Bool(False)
222 _ssh=Bool(False)
224 _context = Instance('zmq.Context')
223 _context = Instance('zmq.Context')
225 _config = Dict()
224 _config = Dict()
226 _engines=Instance(util.ReverseDict, (), {})
225 _engines=Instance(util.ReverseDict, (), {})
227 # _hub_socket=Instance('zmq.Socket')
226 # _hub_socket=Instance('zmq.Socket')
228 _query_socket=Instance('zmq.Socket')
227 _query_socket=Instance('zmq.Socket')
229 _control_socket=Instance('zmq.Socket')
228 _control_socket=Instance('zmq.Socket')
230 _iopub_socket=Instance('zmq.Socket')
229 _iopub_socket=Instance('zmq.Socket')
231 _notification_socket=Instance('zmq.Socket')
230 _notification_socket=Instance('zmq.Socket')
232 _mux_socket=Instance('zmq.Socket')
231 _mux_socket=Instance('zmq.Socket')
233 _task_socket=Instance('zmq.Socket')
232 _task_socket=Instance('zmq.Socket')
234 _task_scheme=Unicode()
233 _task_scheme=Unicode()
235 _closed = False
234 _closed = False
236 _ignored_control_replies=Int(0)
235 _ignored_control_replies=Int(0)
237 _ignored_hub_replies=Int(0)
236 _ignored_hub_replies=Int(0)
238
237
239 def __init__(self, url_or_file=None, profile='default', profile_dir=None, ipython_dir=None,
238 def __init__(self, url_or_file=None, profile='default', profile_dir=None, ipython_dir=None,
240 context=None, username=None, debug=False, exec_key=None,
239 context=None, username=None, debug=False, exec_key=None,
241 sshserver=None, sshkey=None, password=None, paramiko=None,
240 sshserver=None, sshkey=None, password=None, paramiko=None,
242 timeout=10
241 timeout=10
243 ):
242 ):
244 super(Client, self).__init__(debug=debug, profile=profile)
243 super(Client, self).__init__(debug=debug, profile=profile)
245 if context is None:
244 if context is None:
246 context = zmq.Context.instance()
245 context = zmq.Context.instance()
247 self._context = context
246 self._context = context
248
247
249
248
250 self._setup_profile_dir(profile, profile_dir, ipython_dir)
249 self._setup_profile_dir(profile, profile_dir, ipython_dir)
251 if self._cd is not None:
250 if self._cd is not None:
252 if url_or_file is None:
251 if url_or_file is None:
253 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
252 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
254 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
253 assert url_or_file is not None, "I can't find enough information to connect to a hub!"\
255 " Please specify at least one of url_or_file or profile."
254 " Please specify at least one of url_or_file or profile."
256
255
257 try:
256 try:
258 util.validate_url(url_or_file)
257 util.validate_url(url_or_file)
259 except AssertionError:
258 except AssertionError:
260 if not os.path.exists(url_or_file):
259 if not os.path.exists(url_or_file):
261 if self._cd:
260 if self._cd:
262 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
261 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
263 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
262 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
264 with open(url_or_file) as f:
263 with open(url_or_file) as f:
265 cfg = json.loads(f.read())
264 cfg = json.loads(f.read())
266 else:
265 else:
267 cfg = {'url':url_or_file}
266 cfg = {'url':url_or_file}
268
267
269 # sync defaults from args, json:
268 # sync defaults from args, json:
270 if sshserver:
269 if sshserver:
271 cfg['ssh'] = sshserver
270 cfg['ssh'] = sshserver
272 if exec_key:
271 if exec_key:
273 cfg['exec_key'] = exec_key
272 cfg['exec_key'] = exec_key
274 exec_key = cfg['exec_key']
273 exec_key = cfg['exec_key']
275 sshserver=cfg['ssh']
274 sshserver=cfg['ssh']
276 url = cfg['url']
275 url = cfg['url']
277 location = cfg.setdefault('location', None)
276 location = cfg.setdefault('location', None)
278 cfg['url'] = util.disambiguate_url(cfg['url'], location)
277 cfg['url'] = util.disambiguate_url(cfg['url'], location)
279 url = cfg['url']
278 url = cfg['url']
280
279
281 self._config = cfg
280 self._config = cfg
282
281
283 self._ssh = bool(sshserver or sshkey or password)
282 self._ssh = bool(sshserver or sshkey or password)
284 if self._ssh and sshserver is None:
283 if self._ssh and sshserver is None:
285 # default to ssh via localhost
284 # default to ssh via localhost
286 sshserver = url.split('://')[1].split(':')[0]
285 sshserver = url.split('://')[1].split(':')[0]
287 if self._ssh and password is None:
286 if self._ssh and password is None:
288 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
287 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
289 password=False
288 password=False
290 else:
289 else:
291 password = getpass("SSH Password for %s: "%sshserver)
290 password = getpass("SSH Password for %s: "%sshserver)
292 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
291 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
293 if exec_key is not None and os.path.isfile(exec_key):
292 if exec_key is not None and os.path.isfile(exec_key):
294 arg = 'keyfile'
293 arg = 'keyfile'
295 else:
294 else:
296 arg = 'key'
295 arg = 'key'
297 key_arg = {arg:exec_key}
296 key_arg = {arg:exec_key}
298 if username is None:
297 if username is None:
299 self.session = Session(**key_arg)
298 self.session = Session(**key_arg)
300 else:
299 else:
301 self.session = Session(username=username, **key_arg)
300 self.session = Session(username=username, **key_arg)
302 self._query_socket = self._context.socket(zmq.XREQ)
301 self._query_socket = self._context.socket(zmq.XREQ)
303 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
302 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
304 if self._ssh:
303 if self._ssh:
305 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
304 tunnel.tunnel_connection(self._query_socket, url, sshserver, **ssh_kwargs)
306 else:
305 else:
307 self._query_socket.connect(url)
306 self._query_socket.connect(url)
308
307
309 self.session.debug = self.debug
308 self.session.debug = self.debug
310
309
311 self._notification_handlers = {'registration_notification' : self._register_engine,
310 self._notification_handlers = {'registration_notification' : self._register_engine,
312 'unregistration_notification' : self._unregister_engine,
311 'unregistration_notification' : self._unregister_engine,
313 'shutdown_notification' : lambda msg: self.close(),
312 'shutdown_notification' : lambda msg: self.close(),
314 }
313 }
315 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
314 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
316 'apply_reply' : self._handle_apply_reply}
315 'apply_reply' : self._handle_apply_reply}
317 self._connect(sshserver, ssh_kwargs, timeout)
316 self._connect(sshserver, ssh_kwargs, timeout)
318
317
319 def __del__(self):
318 def __del__(self):
320 """cleanup sockets, but _not_ context."""
319 """cleanup sockets, but _not_ context."""
321 self.close()
320 self.close()
322
321
323 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
322 def _setup_profile_dir(self, profile, profile_dir, ipython_dir):
324 if ipython_dir is None:
323 if ipython_dir is None:
325 ipython_dir = get_ipython_dir()
324 ipython_dir = get_ipython_dir()
326 if profile_dir is not None:
325 if profile_dir is not None:
327 try:
326 try:
328 self._cd = ProfileDir.find_profile_dir(profile_dir)
327 self._cd = ProfileDir.find_profile_dir(profile_dir)
329 return
328 return
330 except ProfileDirError:
329 except ProfileDirError:
331 pass
330 pass
332 elif profile is not None:
331 elif profile is not None:
333 try:
332 try:
334 self._cd = ProfileDir.find_profile_dir_by_name(
333 self._cd = ProfileDir.find_profile_dir_by_name(
335 ipython_dir, profile)
334 ipython_dir, profile)
336 return
335 return
337 except ProfileDirError:
336 except ProfileDirError:
338 pass
337 pass
339 self._cd = None
338 self._cd = None
340
339
341 def _update_engines(self, engines):
340 def _update_engines(self, engines):
342 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
341 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
343 for k,v in engines.iteritems():
342 for k,v in engines.iteritems():
344 eid = int(k)
343 eid = int(k)
345 self._engines[eid] = bytes(v) # force not unicode
344 self._engines[eid] = bytes(v) # force not unicode
346 self._ids.append(eid)
345 self._ids.append(eid)
347 self._ids = sorted(self._ids)
346 self._ids = sorted(self._ids)
348 if sorted(self._engines.keys()) != range(len(self._engines)) and \
347 if sorted(self._engines.keys()) != range(len(self._engines)) and \
349 self._task_scheme == 'pure' and self._task_socket:
348 self._task_scheme == 'pure' and self._task_socket:
350 self._stop_scheduling_tasks()
349 self._stop_scheduling_tasks()
351
350
352 def _stop_scheduling_tasks(self):
351 def _stop_scheduling_tasks(self):
353 """Stop scheduling tasks because an engine has been unregistered
352 """Stop scheduling tasks because an engine has been unregistered
354 from a pure ZMQ scheduler.
353 from a pure ZMQ scheduler.
355 """
354 """
356 self._task_socket.close()
355 self._task_socket.close()
357 self._task_socket = None
356 self._task_socket = None
358 msg = "An engine has been unregistered, and we are using pure " +\
357 msg = "An engine has been unregistered, and we are using pure " +\
359 "ZMQ task scheduling. Task farming will be disabled."
358 "ZMQ task scheduling. Task farming will be disabled."
360 if self.outstanding:
359 if self.outstanding:
361 msg += " If you were running tasks when this happened, " +\
360 msg += " If you were running tasks when this happened, " +\
362 "some `outstanding` msg_ids may never resolve."
361 "some `outstanding` msg_ids may never resolve."
363 warnings.warn(msg, RuntimeWarning)
362 warnings.warn(msg, RuntimeWarning)
364
363
365 def _build_targets(self, targets):
364 def _build_targets(self, targets):
366 """Turn valid target IDs or 'all' into two lists:
365 """Turn valid target IDs or 'all' into two lists:
367 (int_ids, uuids).
366 (int_ids, uuids).
368 """
367 """
369 if not self._ids:
368 if not self._ids:
370 # flush notification socket if no engines yet, just in case
369 # flush notification socket if no engines yet, just in case
371 if not self.ids:
370 if not self.ids:
372 raise error.NoEnginesRegistered("Can't build targets without any engines")
371 raise error.NoEnginesRegistered("Can't build targets without any engines")
373
372
374 if targets is None:
373 if targets is None:
375 targets = self._ids
374 targets = self._ids
376 elif isinstance(targets, str):
375 elif isinstance(targets, str):
377 if targets.lower() == 'all':
376 if targets.lower() == 'all':
378 targets = self._ids
377 targets = self._ids
379 else:
378 else:
380 raise TypeError("%r not valid str target, must be 'all'"%(targets))
379 raise TypeError("%r not valid str target, must be 'all'"%(targets))
381 elif isinstance(targets, int):
380 elif isinstance(targets, int):
382 if targets < 0:
381 if targets < 0:
383 targets = self.ids[targets]
382 targets = self.ids[targets]
384 if targets not in self._ids:
383 if targets not in self._ids:
385 raise IndexError("No such engine: %i"%targets)
384 raise IndexError("No such engine: %i"%targets)
386 targets = [targets]
385 targets = [targets]
387
386
388 if isinstance(targets, slice):
387 if isinstance(targets, slice):
389 indices = range(len(self._ids))[targets]
388 indices = range(len(self._ids))[targets]
390 ids = self.ids
389 ids = self.ids
391 targets = [ ids[i] for i in indices ]
390 targets = [ ids[i] for i in indices ]
392
391
393 if not isinstance(targets, (tuple, list, xrange)):
392 if not isinstance(targets, (tuple, list, xrange)):
394 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
393 raise TypeError("targets by int/slice/collection of ints only, not %s"%(type(targets)))
395
394
396 return [self._engines[t] for t in targets], list(targets)
395 return [self._engines[t] for t in targets], list(targets)
397
396
398 def _connect(self, sshserver, ssh_kwargs, timeout):
397 def _connect(self, sshserver, ssh_kwargs, timeout):
399 """setup all our socket connections to the cluster. This is called from
398 """setup all our socket connections to the cluster. This is called from
400 __init__."""
399 __init__."""
401
400
402 # Maybe allow reconnecting?
401 # Maybe allow reconnecting?
403 if self._connected:
402 if self._connected:
404 return
403 return
405 self._connected=True
404 self._connected=True
406
405
407 def connect_socket(s, url):
406 def connect_socket(s, url):
408 url = util.disambiguate_url(url, self._config['location'])
407 url = util.disambiguate_url(url, self._config['location'])
409 if self._ssh:
408 if self._ssh:
410 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
409 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
411 else:
410 else:
412 return s.connect(url)
411 return s.connect(url)
413
412
414 self.session.send(self._query_socket, 'connection_request')
413 self.session.send(self._query_socket, 'connection_request')
415 r,w,x = zmq.select([self._query_socket],[],[], timeout)
414 r,w,x = zmq.select([self._query_socket],[],[], timeout)
416 if not r:
415 if not r:
417 raise error.TimeoutError("Hub connection request timed out")
416 raise error.TimeoutError("Hub connection request timed out")
418 idents,msg = self.session.recv(self._query_socket,mode=0)
417 idents,msg = self.session.recv(self._query_socket,mode=0)
419 if self.debug:
418 if self.debug:
420 pprint(msg)
419 pprint(msg)
421 msg = Message(msg)
420 msg = Message(msg)
422 content = msg.content
421 content = msg.content
423 self._config['registration'] = dict(content)
422 self._config['registration'] = dict(content)
424 if content.status == 'ok':
423 if content.status == 'ok':
425 if content.mux:
424 if content.mux:
426 self._mux_socket = self._context.socket(zmq.XREQ)
425 self._mux_socket = self._context.socket(zmq.XREQ)
427 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
426 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
428 connect_socket(self._mux_socket, content.mux)
427 connect_socket(self._mux_socket, content.mux)
429 if content.task:
428 if content.task:
430 self._task_scheme, task_addr = content.task
429 self._task_scheme, task_addr = content.task
431 self._task_socket = self._context.socket(zmq.XREQ)
430 self._task_socket = self._context.socket(zmq.XREQ)
432 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
431 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
433 connect_socket(self._task_socket, task_addr)
432 connect_socket(self._task_socket, task_addr)
434 if content.notification:
433 if content.notification:
435 self._notification_socket = self._context.socket(zmq.SUB)
434 self._notification_socket = self._context.socket(zmq.SUB)
436 connect_socket(self._notification_socket, content.notification)
435 connect_socket(self._notification_socket, content.notification)
437 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
436 self._notification_socket.setsockopt(zmq.SUBSCRIBE, b'')
438 # if content.query:
437 # if content.query:
439 # self._query_socket = self._context.socket(zmq.XREQ)
438 # self._query_socket = self._context.socket(zmq.XREQ)
440 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
439 # self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
441 # connect_socket(self._query_socket, content.query)
440 # connect_socket(self._query_socket, content.query)
442 if content.control:
441 if content.control:
443 self._control_socket = self._context.socket(zmq.XREQ)
442 self._control_socket = self._context.socket(zmq.XREQ)
444 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
443 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
445 connect_socket(self._control_socket, content.control)
444 connect_socket(self._control_socket, content.control)
446 if content.iopub:
445 if content.iopub:
447 self._iopub_socket = self._context.socket(zmq.SUB)
446 self._iopub_socket = self._context.socket(zmq.SUB)
448 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
447 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, b'')
449 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
448 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
450 connect_socket(self._iopub_socket, content.iopub)
449 connect_socket(self._iopub_socket, content.iopub)
451 self._update_engines(dict(content.engines))
450 self._update_engines(dict(content.engines))
452 else:
451 else:
453 self._connected = False
452 self._connected = False
454 raise Exception("Failed to connect!")
453 raise Exception("Failed to connect!")
455
454
456 #--------------------------------------------------------------------------
455 #--------------------------------------------------------------------------
457 # handlers and callbacks for incoming messages
456 # handlers and callbacks for incoming messages
458 #--------------------------------------------------------------------------
457 #--------------------------------------------------------------------------
459
458
460 def _unwrap_exception(self, content):
459 def _unwrap_exception(self, content):
461 """unwrap exception, and remap engine_id to int."""
460 """unwrap exception, and remap engine_id to int."""
462 e = error.unwrap_exception(content)
461 e = error.unwrap_exception(content)
463 # print e.traceback
462 # print e.traceback
464 if e.engine_info:
463 if e.engine_info:
465 e_uuid = e.engine_info['engine_uuid']
464 e_uuid = e.engine_info['engine_uuid']
466 eid = self._engines[e_uuid]
465 eid = self._engines[e_uuid]
467 e.engine_info['engine_id'] = eid
466 e.engine_info['engine_id'] = eid
468 return e
467 return e
469
468
470 def _extract_metadata(self, header, parent, content):
469 def _extract_metadata(self, header, parent, content):
471 md = {'msg_id' : parent['msg_id'],
470 md = {'msg_id' : parent['msg_id'],
472 'received' : datetime.now(),
471 'received' : datetime.now(),
473 'engine_uuid' : header.get('engine', None),
472 'engine_uuid' : header.get('engine', None),
474 'follow' : parent.get('follow', []),
473 'follow' : parent.get('follow', []),
475 'after' : parent.get('after', []),
474 'after' : parent.get('after', []),
476 'status' : content['status'],
475 'status' : content['status'],
477 }
476 }
478
477
479 if md['engine_uuid'] is not None:
478 if md['engine_uuid'] is not None:
480 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
479 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
481
480
482 if 'date' in parent:
481 if 'date' in parent:
483 md['submitted'] = parent['date']
482 md['submitted'] = parent['date']
484 if 'started' in header:
483 if 'started' in header:
485 md['started'] = header['started']
484 md['started'] = header['started']
486 if 'date' in header:
485 if 'date' in header:
487 md['completed'] = header['date']
486 md['completed'] = header['date']
488 return md
487 return md
489
488
490 def _register_engine(self, msg):
489 def _register_engine(self, msg):
491 """Register a new engine, and update our connection info."""
490 """Register a new engine, and update our connection info."""
492 content = msg['content']
491 content = msg['content']
493 eid = content['id']
492 eid = content['id']
494 d = {eid : content['queue']}
493 d = {eid : content['queue']}
495 self._update_engines(d)
494 self._update_engines(d)
496
495
497 def _unregister_engine(self, msg):
496 def _unregister_engine(self, msg):
498 """Unregister an engine that has died."""
497 """Unregister an engine that has died."""
499 content = msg['content']
498 content = msg['content']
500 eid = int(content['id'])
499 eid = int(content['id'])
501 if eid in self._ids:
500 if eid in self._ids:
502 self._ids.remove(eid)
501 self._ids.remove(eid)
503 uuid = self._engines.pop(eid)
502 uuid = self._engines.pop(eid)
504
503
505 self._handle_stranded_msgs(eid, uuid)
504 self._handle_stranded_msgs(eid, uuid)
506
505
507 if self._task_socket and self._task_scheme == 'pure':
506 if self._task_socket and self._task_scheme == 'pure':
508 self._stop_scheduling_tasks()
507 self._stop_scheduling_tasks()
509
508
510 def _handle_stranded_msgs(self, eid, uuid):
509 def _handle_stranded_msgs(self, eid, uuid):
511 """Handle messages known to be on an engine when the engine unregisters.
510 """Handle messages known to be on an engine when the engine unregisters.
512
511
513 It is possible that this will fire prematurely - that is, an engine will
512 It is possible that this will fire prematurely - that is, an engine will
514 go down after completing a result, and the client will be notified
513 go down after completing a result, and the client will be notified
515 of the unregistration and later receive the successful result.
514 of the unregistration and later receive the successful result.
516 """
515 """
517
516
518 outstanding = self._outstanding_dict[uuid]
517 outstanding = self._outstanding_dict[uuid]
519
518
520 for msg_id in list(outstanding):
519 for msg_id in list(outstanding):
521 if msg_id in self.results:
520 if msg_id in self.results:
522 # we already
521 # we already
523 continue
522 continue
524 try:
523 try:
525 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
524 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
526 except:
525 except:
527 content = error.wrap_exception()
526 content = error.wrap_exception()
528 # build a fake message:
527 # build a fake message:
529 parent = {}
528 parent = {}
530 header = {}
529 header = {}
531 parent['msg_id'] = msg_id
530 parent['msg_id'] = msg_id
532 header['engine'] = uuid
531 header['engine'] = uuid
533 header['date'] = datetime.now()
532 header['date'] = datetime.now()
534 msg = dict(parent_header=parent, header=header, content=content)
533 msg = dict(parent_header=parent, header=header, content=content)
535 self._handle_apply_reply(msg)
534 self._handle_apply_reply(msg)
536
535
537 def _handle_execute_reply(self, msg):
536 def _handle_execute_reply(self, msg):
538 """Save the reply to an execute_request into our results.
537 """Save the reply to an execute_request into our results.
539
538
540 execute messages are never actually used. apply is used instead.
539 execute messages are never actually used. apply is used instead.
541 """
540 """
542
541
543 parent = msg['parent_header']
542 parent = msg['parent_header']
544 msg_id = parent['msg_id']
543 msg_id = parent['msg_id']
545 if msg_id not in self.outstanding:
544 if msg_id not in self.outstanding:
546 if msg_id in self.history:
545 if msg_id in self.history:
547 print ("got stale result: %s"%msg_id)
546 print ("got stale result: %s"%msg_id)
548 else:
547 else:
549 print ("got unknown result: %s"%msg_id)
548 print ("got unknown result: %s"%msg_id)
550 else:
549 else:
551 self.outstanding.remove(msg_id)
550 self.outstanding.remove(msg_id)
552 self.results[msg_id] = self._unwrap_exception(msg['content'])
551 self.results[msg_id] = self._unwrap_exception(msg['content'])
553
552
554 def _handle_apply_reply(self, msg):
553 def _handle_apply_reply(self, msg):
555 """Save the reply to an apply_request into our results."""
554 """Save the reply to an apply_request into our results."""
556 parent = extract_dates(msg['parent_header'])
555 parent = msg['parent_header']
557 msg_id = parent['msg_id']
556 msg_id = parent['msg_id']
558 if msg_id not in self.outstanding:
557 if msg_id not in self.outstanding:
559 if msg_id in self.history:
558 if msg_id in self.history:
560 print ("got stale result: %s"%msg_id)
559 print ("got stale result: %s"%msg_id)
561 print self.results[msg_id]
560 print self.results[msg_id]
562 print msg
561 print msg
563 else:
562 else:
564 print ("got unknown result: %s"%msg_id)
563 print ("got unknown result: %s"%msg_id)
565 else:
564 else:
566 self.outstanding.remove(msg_id)
565 self.outstanding.remove(msg_id)
567 content = msg['content']
566 content = msg['content']
568 header = extract_dates(msg['header'])
567 header = msg['header']
569
568
570 # construct metadata:
569 # construct metadata:
571 md = self.metadata[msg_id]
570 md = self.metadata[msg_id]
572 md.update(self._extract_metadata(header, parent, content))
571 md.update(self._extract_metadata(header, parent, content))
573 # is this redundant?
572 # is this redundant?
574 self.metadata[msg_id] = md
573 self.metadata[msg_id] = md
575
574
576 e_outstanding = self._outstanding_dict[md['engine_uuid']]
575 e_outstanding = self._outstanding_dict[md['engine_uuid']]
577 if msg_id in e_outstanding:
576 if msg_id in e_outstanding:
578 e_outstanding.remove(msg_id)
577 e_outstanding.remove(msg_id)
579
578
580 # construct result:
579 # construct result:
581 if content['status'] == 'ok':
580 if content['status'] == 'ok':
582 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
581 self.results[msg_id] = util.unserialize_object(msg['buffers'])[0]
583 elif content['status'] == 'aborted':
582 elif content['status'] == 'aborted':
584 self.results[msg_id] = error.TaskAborted(msg_id)
583 self.results[msg_id] = error.TaskAborted(msg_id)
585 elif content['status'] == 'resubmitted':
584 elif content['status'] == 'resubmitted':
586 # TODO: handle resubmission
585 # TODO: handle resubmission
587 pass
586 pass
588 else:
587 else:
589 self.results[msg_id] = self._unwrap_exception(content)
588 self.results[msg_id] = self._unwrap_exception(content)
590
589
591 def _flush_notifications(self):
590 def _flush_notifications(self):
592 """Flush notifications of engine registrations waiting
591 """Flush notifications of engine registrations waiting
593 in ZMQ queue."""
592 in ZMQ queue."""
594 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
593 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
595 while msg is not None:
594 while msg is not None:
596 if self.debug:
595 if self.debug:
597 pprint(msg)
596 pprint(msg)
598 msg_type = msg['msg_type']
597 msg_type = msg['msg_type']
599 handler = self._notification_handlers.get(msg_type, None)
598 handler = self._notification_handlers.get(msg_type, None)
600 if handler is None:
599 if handler is None:
601 raise Exception("Unhandled message type: %s"%msg.msg_type)
600 raise Exception("Unhandled message type: %s"%msg.msg_type)
602 else:
601 else:
603 handler(msg)
602 handler(msg)
604 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
603 idents,msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
605
604
606 def _flush_results(self, sock):
605 def _flush_results(self, sock):
607 """Flush task or queue results waiting in ZMQ queue."""
606 """Flush task or queue results waiting in ZMQ queue."""
608 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
607 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
609 while msg is not None:
608 while msg is not None:
610 if self.debug:
609 if self.debug:
611 pprint(msg)
610 pprint(msg)
612 msg_type = msg['msg_type']
611 msg_type = msg['msg_type']
613 handler = self._queue_handlers.get(msg_type, None)
612 handler = self._queue_handlers.get(msg_type, None)
614 if handler is None:
613 if handler is None:
615 raise Exception("Unhandled message type: %s"%msg.msg_type)
614 raise Exception("Unhandled message type: %s"%msg.msg_type)
616 else:
615 else:
617 handler(msg)
616 handler(msg)
618 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
617 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
619
618
620 def _flush_control(self, sock):
619 def _flush_control(self, sock):
621 """Flush replies from the control channel waiting
620 """Flush replies from the control channel waiting
622 in the ZMQ queue.
621 in the ZMQ queue.
623
622
624 Currently: ignore them."""
623 Currently: ignore them."""
625 if self._ignored_control_replies <= 0:
624 if self._ignored_control_replies <= 0:
626 return
625 return
627 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
626 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
628 while msg is not None:
627 while msg is not None:
629 self._ignored_control_replies -= 1
628 self._ignored_control_replies -= 1
630 if self.debug:
629 if self.debug:
631 pprint(msg)
630 pprint(msg)
632 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
631 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
633
632
634 def _flush_ignored_control(self):
633 def _flush_ignored_control(self):
635 """flush ignored control replies"""
634 """flush ignored control replies"""
636 while self._ignored_control_replies > 0:
635 while self._ignored_control_replies > 0:
637 self.session.recv(self._control_socket)
636 self.session.recv(self._control_socket)
638 self._ignored_control_replies -= 1
637 self._ignored_control_replies -= 1
639
638
640 def _flush_ignored_hub_replies(self):
639 def _flush_ignored_hub_replies(self):
641 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
640 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
642 while msg is not None:
641 while msg is not None:
643 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
642 ident,msg = self.session.recv(self._query_socket, mode=zmq.NOBLOCK)
644
643
645 def _flush_iopub(self, sock):
644 def _flush_iopub(self, sock):
646 """Flush replies from the iopub channel waiting
645 """Flush replies from the iopub channel waiting
647 in the ZMQ queue.
646 in the ZMQ queue.
648 """
647 """
649 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
648 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
650 while msg is not None:
649 while msg is not None:
651 if self.debug:
650 if self.debug:
652 pprint(msg)
651 pprint(msg)
653 parent = msg['parent_header']
652 parent = msg['parent_header']
654 msg_id = parent['msg_id']
653 msg_id = parent['msg_id']
655 content = msg['content']
654 content = msg['content']
656 header = msg['header']
655 header = msg['header']
657 msg_type = msg['msg_type']
656 msg_type = msg['msg_type']
658
657
659 # init metadata:
658 # init metadata:
660 md = self.metadata[msg_id]
659 md = self.metadata[msg_id]
661
660
662 if msg_type == 'stream':
661 if msg_type == 'stream':
663 name = content['name']
662 name = content['name']
664 s = md[name] or ''
663 s = md[name] or ''
665 md[name] = s + content['data']
664 md[name] = s + content['data']
666 elif msg_type == 'pyerr':
665 elif msg_type == 'pyerr':
667 md.update({'pyerr' : self._unwrap_exception(content)})
666 md.update({'pyerr' : self._unwrap_exception(content)})
668 elif msg_type == 'pyin':
667 elif msg_type == 'pyin':
669 md.update({'pyin' : content['code']})
668 md.update({'pyin' : content['code']})
670 else:
669 else:
671 md.update({msg_type : content.get('data', '')})
670 md.update({msg_type : content.get('data', '')})
672
671
673 # reduntant?
672 # reduntant?
674 self.metadata[msg_id] = md
673 self.metadata[msg_id] = md
675
674
676 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
675 idents,msg = self.session.recv(sock, mode=zmq.NOBLOCK)
677
676
678 #--------------------------------------------------------------------------
677 #--------------------------------------------------------------------------
679 # len, getitem
678 # len, getitem
680 #--------------------------------------------------------------------------
679 #--------------------------------------------------------------------------
681
680
682 def __len__(self):
681 def __len__(self):
683 """len(client) returns # of engines."""
682 """len(client) returns # of engines."""
684 return len(self.ids)
683 return len(self.ids)
685
684
686 def __getitem__(self, key):
685 def __getitem__(self, key):
687 """index access returns DirectView multiplexer objects
686 """index access returns DirectView multiplexer objects
688
687
689 Must be int, slice, or list/tuple/xrange of ints"""
688 Must be int, slice, or list/tuple/xrange of ints"""
690 if not isinstance(key, (int, slice, tuple, list, xrange)):
689 if not isinstance(key, (int, slice, tuple, list, xrange)):
691 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
690 raise TypeError("key by int/slice/iterable of ints only, not %s"%(type(key)))
692 else:
691 else:
693 return self.direct_view(key)
692 return self.direct_view(key)
694
693
695 #--------------------------------------------------------------------------
694 #--------------------------------------------------------------------------
696 # Begin public methods
695 # Begin public methods
697 #--------------------------------------------------------------------------
696 #--------------------------------------------------------------------------
698
697
699 @property
698 @property
700 def ids(self):
699 def ids(self):
701 """Always up-to-date ids property."""
700 """Always up-to-date ids property."""
702 self._flush_notifications()
701 self._flush_notifications()
703 # always copy:
702 # always copy:
704 return list(self._ids)
703 return list(self._ids)
705
704
706 def close(self):
705 def close(self):
707 if self._closed:
706 if self._closed:
708 return
707 return
709 snames = filter(lambda n: n.endswith('socket'), dir(self))
708 snames = filter(lambda n: n.endswith('socket'), dir(self))
710 for socket in map(lambda name: getattr(self, name), snames):
709 for socket in map(lambda name: getattr(self, name), snames):
711 if isinstance(socket, zmq.Socket) and not socket.closed:
710 if isinstance(socket, zmq.Socket) and not socket.closed:
712 socket.close()
711 socket.close()
713 self._closed = True
712 self._closed = True
714
713
715 def spin(self):
714 def spin(self):
716 """Flush any registration notifications and execution results
715 """Flush any registration notifications and execution results
717 waiting in the ZMQ queue.
716 waiting in the ZMQ queue.
718 """
717 """
719 if self._notification_socket:
718 if self._notification_socket:
720 self._flush_notifications()
719 self._flush_notifications()
721 if self._mux_socket:
720 if self._mux_socket:
722 self._flush_results(self._mux_socket)
721 self._flush_results(self._mux_socket)
723 if self._task_socket:
722 if self._task_socket:
724 self._flush_results(self._task_socket)
723 self._flush_results(self._task_socket)
725 if self._control_socket:
724 if self._control_socket:
726 self._flush_control(self._control_socket)
725 self._flush_control(self._control_socket)
727 if self._iopub_socket:
726 if self._iopub_socket:
728 self._flush_iopub(self._iopub_socket)
727 self._flush_iopub(self._iopub_socket)
729 if self._query_socket:
728 if self._query_socket:
730 self._flush_ignored_hub_replies()
729 self._flush_ignored_hub_replies()
731
730
732 def wait(self, jobs=None, timeout=-1):
731 def wait(self, jobs=None, timeout=-1):
733 """waits on one or more `jobs`, for up to `timeout` seconds.
732 """waits on one or more `jobs`, for up to `timeout` seconds.
734
733
735 Parameters
734 Parameters
736 ----------
735 ----------
737
736
738 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
737 jobs : int, str, or list of ints and/or strs, or one or more AsyncResult objects
739 ints are indices to self.history
738 ints are indices to self.history
740 strs are msg_ids
739 strs are msg_ids
741 default: wait on all outstanding messages
740 default: wait on all outstanding messages
742 timeout : float
741 timeout : float
743 a time in seconds, after which to give up.
742 a time in seconds, after which to give up.
744 default is -1, which means no timeout
743 default is -1, which means no timeout
745
744
746 Returns
745 Returns
747 -------
746 -------
748
747
749 True : when all msg_ids are done
748 True : when all msg_ids are done
750 False : timeout reached, some msg_ids still outstanding
749 False : timeout reached, some msg_ids still outstanding
751 """
750 """
752 tic = time.time()
751 tic = time.time()
753 if jobs is None:
752 if jobs is None:
754 theids = self.outstanding
753 theids = self.outstanding
755 else:
754 else:
756 if isinstance(jobs, (int, str, AsyncResult)):
755 if isinstance(jobs, (int, str, AsyncResult)):
757 jobs = [jobs]
756 jobs = [jobs]
758 theids = set()
757 theids = set()
759 for job in jobs:
758 for job in jobs:
760 if isinstance(job, int):
759 if isinstance(job, int):
761 # index access
760 # index access
762 job = self.history[job]
761 job = self.history[job]
763 elif isinstance(job, AsyncResult):
762 elif isinstance(job, AsyncResult):
764 map(theids.add, job.msg_ids)
763 map(theids.add, job.msg_ids)
765 continue
764 continue
766 theids.add(job)
765 theids.add(job)
767 if not theids.intersection(self.outstanding):
766 if not theids.intersection(self.outstanding):
768 return True
767 return True
769 self.spin()
768 self.spin()
770 while theids.intersection(self.outstanding):
769 while theids.intersection(self.outstanding):
771 if timeout >= 0 and ( time.time()-tic ) > timeout:
770 if timeout >= 0 and ( time.time()-tic ) > timeout:
772 break
771 break
773 time.sleep(1e-3)
772 time.sleep(1e-3)
774 self.spin()
773 self.spin()
775 return len(theids.intersection(self.outstanding)) == 0
774 return len(theids.intersection(self.outstanding)) == 0
776
775
777 #--------------------------------------------------------------------------
776 #--------------------------------------------------------------------------
778 # Control methods
777 # Control methods
779 #--------------------------------------------------------------------------
778 #--------------------------------------------------------------------------
780
779
781 @spin_first
780 @spin_first
782 def clear(self, targets=None, block=None):
781 def clear(self, targets=None, block=None):
783 """Clear the namespace in target(s)."""
782 """Clear the namespace in target(s)."""
784 block = self.block if block is None else block
783 block = self.block if block is None else block
785 targets = self._build_targets(targets)[0]
784 targets = self._build_targets(targets)[0]
786 for t in targets:
785 for t in targets:
787 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
786 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
788 error = False
787 error = False
789 if block:
788 if block:
790 self._flush_ignored_control()
789 self._flush_ignored_control()
791 for i in range(len(targets)):
790 for i in range(len(targets)):
792 idents,msg = self.session.recv(self._control_socket,0)
791 idents,msg = self.session.recv(self._control_socket,0)
793 if self.debug:
792 if self.debug:
794 pprint(msg)
793 pprint(msg)
795 if msg['content']['status'] != 'ok':
794 if msg['content']['status'] != 'ok':
796 error = self._unwrap_exception(msg['content'])
795 error = self._unwrap_exception(msg['content'])
797 else:
796 else:
798 self._ignored_control_replies += len(targets)
797 self._ignored_control_replies += len(targets)
799 if error:
798 if error:
800 raise error
799 raise error
801
800
802
801
803 @spin_first
802 @spin_first
804 def abort(self, jobs=None, targets=None, block=None):
803 def abort(self, jobs=None, targets=None, block=None):
805 """Abort specific jobs from the execution queues of target(s).
804 """Abort specific jobs from the execution queues of target(s).
806
805
807 This is a mechanism to prevent jobs that have already been submitted
806 This is a mechanism to prevent jobs that have already been submitted
808 from executing.
807 from executing.
809
808
810 Parameters
809 Parameters
811 ----------
810 ----------
812
811
813 jobs : msg_id, list of msg_ids, or AsyncResult
812 jobs : msg_id, list of msg_ids, or AsyncResult
814 The jobs to be aborted
813 The jobs to be aborted
815
814
816
815
817 """
816 """
818 block = self.block if block is None else block
817 block = self.block if block is None else block
819 targets = self._build_targets(targets)[0]
818 targets = self._build_targets(targets)[0]
820 msg_ids = []
819 msg_ids = []
821 if isinstance(jobs, (basestring,AsyncResult)):
820 if isinstance(jobs, (basestring,AsyncResult)):
822 jobs = [jobs]
821 jobs = [jobs]
823 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
822 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
824 if bad_ids:
823 if bad_ids:
825 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
824 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
826 for j in jobs:
825 for j in jobs:
827 if isinstance(j, AsyncResult):
826 if isinstance(j, AsyncResult):
828 msg_ids.extend(j.msg_ids)
827 msg_ids.extend(j.msg_ids)
829 else:
828 else:
830 msg_ids.append(j)
829 msg_ids.append(j)
831 content = dict(msg_ids=msg_ids)
830 content = dict(msg_ids=msg_ids)
832 for t in targets:
831 for t in targets:
833 self.session.send(self._control_socket, 'abort_request',
832 self.session.send(self._control_socket, 'abort_request',
834 content=content, ident=t)
833 content=content, ident=t)
835 error = False
834 error = False
836 if block:
835 if block:
837 self._flush_ignored_control()
836 self._flush_ignored_control()
838 for i in range(len(targets)):
837 for i in range(len(targets)):
839 idents,msg = self.session.recv(self._control_socket,0)
838 idents,msg = self.session.recv(self._control_socket,0)
840 if self.debug:
839 if self.debug:
841 pprint(msg)
840 pprint(msg)
842 if msg['content']['status'] != 'ok':
841 if msg['content']['status'] != 'ok':
843 error = self._unwrap_exception(msg['content'])
842 error = self._unwrap_exception(msg['content'])
844 else:
843 else:
845 self._ignored_control_replies += len(targets)
844 self._ignored_control_replies += len(targets)
846 if error:
845 if error:
847 raise error
846 raise error
848
847
849 @spin_first
848 @spin_first
850 def shutdown(self, targets=None, restart=False, hub=False, block=None):
849 def shutdown(self, targets=None, restart=False, hub=False, block=None):
851 """Terminates one or more engine processes, optionally including the hub."""
850 """Terminates one or more engine processes, optionally including the hub."""
852 block = self.block if block is None else block
851 block = self.block if block is None else block
853 if hub:
852 if hub:
854 targets = 'all'
853 targets = 'all'
855 targets = self._build_targets(targets)[0]
854 targets = self._build_targets(targets)[0]
856 for t in targets:
855 for t in targets:
857 self.session.send(self._control_socket, 'shutdown_request',
856 self.session.send(self._control_socket, 'shutdown_request',
858 content={'restart':restart},ident=t)
857 content={'restart':restart},ident=t)
859 error = False
858 error = False
860 if block or hub:
859 if block or hub:
861 self._flush_ignored_control()
860 self._flush_ignored_control()
862 for i in range(len(targets)):
861 for i in range(len(targets)):
863 idents,msg = self.session.recv(self._control_socket, 0)
862 idents,msg = self.session.recv(self._control_socket, 0)
864 if self.debug:
863 if self.debug:
865 pprint(msg)
864 pprint(msg)
866 if msg['content']['status'] != 'ok':
865 if msg['content']['status'] != 'ok':
867 error = self._unwrap_exception(msg['content'])
866 error = self._unwrap_exception(msg['content'])
868 else:
867 else:
869 self._ignored_control_replies += len(targets)
868 self._ignored_control_replies += len(targets)
870
869
871 if hub:
870 if hub:
872 time.sleep(0.25)
871 time.sleep(0.25)
873 self.session.send(self._query_socket, 'shutdown_request')
872 self.session.send(self._query_socket, 'shutdown_request')
874 idents,msg = self.session.recv(self._query_socket, 0)
873 idents,msg = self.session.recv(self._query_socket, 0)
875 if self.debug:
874 if self.debug:
876 pprint(msg)
875 pprint(msg)
877 if msg['content']['status'] != 'ok':
876 if msg['content']['status'] != 'ok':
878 error = self._unwrap_exception(msg['content'])
877 error = self._unwrap_exception(msg['content'])
879
878
880 if error:
879 if error:
881 raise error
880 raise error
882
881
883 #--------------------------------------------------------------------------
882 #--------------------------------------------------------------------------
884 # Execution related methods
883 # Execution related methods
885 #--------------------------------------------------------------------------
884 #--------------------------------------------------------------------------
886
885
887 def _maybe_raise(self, result):
886 def _maybe_raise(self, result):
888 """wrapper for maybe raising an exception if apply failed."""
887 """wrapper for maybe raising an exception if apply failed."""
889 if isinstance(result, error.RemoteError):
888 if isinstance(result, error.RemoteError):
890 raise result
889 raise result
891
890
892 return result
891 return result
893
892
894 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
893 def send_apply_message(self, socket, f, args=None, kwargs=None, subheader=None, track=False,
895 ident=None):
894 ident=None):
896 """construct and send an apply message via a socket.
895 """construct and send an apply message via a socket.
897
896
898 This is the principal method with which all engine execution is performed by views.
897 This is the principal method with which all engine execution is performed by views.
899 """
898 """
900
899
901 assert not self._closed, "cannot use me anymore, I'm closed!"
900 assert not self._closed, "cannot use me anymore, I'm closed!"
902 # defaults:
901 # defaults:
903 args = args if args is not None else []
902 args = args if args is not None else []
904 kwargs = kwargs if kwargs is not None else {}
903 kwargs = kwargs if kwargs is not None else {}
905 subheader = subheader if subheader is not None else {}
904 subheader = subheader if subheader is not None else {}
906
905
907 # validate arguments
906 # validate arguments
908 if not callable(f):
907 if not callable(f):
909 raise TypeError("f must be callable, not %s"%type(f))
908 raise TypeError("f must be callable, not %s"%type(f))
910 if not isinstance(args, (tuple, list)):
909 if not isinstance(args, (tuple, list)):
911 raise TypeError("args must be tuple or list, not %s"%type(args))
910 raise TypeError("args must be tuple or list, not %s"%type(args))
912 if not isinstance(kwargs, dict):
911 if not isinstance(kwargs, dict):
913 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
912 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
914 if not isinstance(subheader, dict):
913 if not isinstance(subheader, dict):
915 raise TypeError("subheader must be dict, not %s"%type(subheader))
914 raise TypeError("subheader must be dict, not %s"%type(subheader))
916
915
917 bufs = util.pack_apply_message(f,args,kwargs)
916 bufs = util.pack_apply_message(f,args,kwargs)
918
917
919 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
918 msg = self.session.send(socket, "apply_request", buffers=bufs, ident=ident,
920 subheader=subheader, track=track)
919 subheader=subheader, track=track)
921
920
922 msg_id = msg['msg_id']
921 msg_id = msg['msg_id']
923 self.outstanding.add(msg_id)
922 self.outstanding.add(msg_id)
924 if ident:
923 if ident:
925 # possibly routed to a specific engine
924 # possibly routed to a specific engine
926 if isinstance(ident, list):
925 if isinstance(ident, list):
927 ident = ident[-1]
926 ident = ident[-1]
928 if ident in self._engines.values():
927 if ident in self._engines.values():
929 # save for later, in case of engine death
928 # save for later, in case of engine death
930 self._outstanding_dict[ident].add(msg_id)
929 self._outstanding_dict[ident].add(msg_id)
931 self.history.append(msg_id)
930 self.history.append(msg_id)
932 self.metadata[msg_id]['submitted'] = datetime.now()
931 self.metadata[msg_id]['submitted'] = datetime.now()
933
932
934 return msg
933 return msg
935
934
936 #--------------------------------------------------------------------------
935 #--------------------------------------------------------------------------
937 # construct a View object
936 # construct a View object
938 #--------------------------------------------------------------------------
937 #--------------------------------------------------------------------------
939
938
940 def load_balanced_view(self, targets=None):
939 def load_balanced_view(self, targets=None):
941 """construct a DirectView object.
940 """construct a DirectView object.
942
941
943 If no arguments are specified, create a LoadBalancedView
942 If no arguments are specified, create a LoadBalancedView
944 using all engines.
943 using all engines.
945
944
946 Parameters
945 Parameters
947 ----------
946 ----------
948
947
949 targets: list,slice,int,etc. [default: use all engines]
948 targets: list,slice,int,etc. [default: use all engines]
950 The subset of engines across which to load-balance
949 The subset of engines across which to load-balance
951 """
950 """
952 if targets is not None:
951 if targets is not None:
953 targets = self._build_targets(targets)[1]
952 targets = self._build_targets(targets)[1]
954 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
953 return LoadBalancedView(client=self, socket=self._task_socket, targets=targets)
955
954
956 def direct_view(self, targets='all'):
955 def direct_view(self, targets='all'):
957 """construct a DirectView object.
956 """construct a DirectView object.
958
957
959 If no targets are specified, create a DirectView
958 If no targets are specified, create a DirectView
960 using all engines.
959 using all engines.
961
960
962 Parameters
961 Parameters
963 ----------
962 ----------
964
963
965 targets: list,slice,int,etc. [default: use all engines]
964 targets: list,slice,int,etc. [default: use all engines]
966 The engines to use for the View
965 The engines to use for the View
967 """
966 """
968 single = isinstance(targets, int)
967 single = isinstance(targets, int)
969 targets = self._build_targets(targets)[1]
968 targets = self._build_targets(targets)[1]
970 if single:
969 if single:
971 targets = targets[0]
970 targets = targets[0]
972 return DirectView(client=self, socket=self._mux_socket, targets=targets)
971 return DirectView(client=self, socket=self._mux_socket, targets=targets)
973
972
974 #--------------------------------------------------------------------------
973 #--------------------------------------------------------------------------
975 # Query methods
974 # Query methods
976 #--------------------------------------------------------------------------
975 #--------------------------------------------------------------------------
977
976
978 @spin_first
977 @spin_first
979 def get_result(self, indices_or_msg_ids=None, block=None):
978 def get_result(self, indices_or_msg_ids=None, block=None):
980 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
979 """Retrieve a result by msg_id or history index, wrapped in an AsyncResult object.
981
980
982 If the client already has the results, no request to the Hub will be made.
981 If the client already has the results, no request to the Hub will be made.
983
982
984 This is a convenient way to construct AsyncResult objects, which are wrappers
983 This is a convenient way to construct AsyncResult objects, which are wrappers
985 that include metadata about execution, and allow for awaiting results that
984 that include metadata about execution, and allow for awaiting results that
986 were not submitted by this Client.
985 were not submitted by this Client.
987
986
988 It can also be a convenient way to retrieve the metadata associated with
987 It can also be a convenient way to retrieve the metadata associated with
989 blocking execution, since it always retrieves
988 blocking execution, since it always retrieves
990
989
991 Examples
990 Examples
992 --------
991 --------
993 ::
992 ::
994
993
995 In [10]: r = client.apply()
994 In [10]: r = client.apply()
996
995
997 Parameters
996 Parameters
998 ----------
997 ----------
999
998
1000 indices_or_msg_ids : integer history index, str msg_id, or list of either
999 indices_or_msg_ids : integer history index, str msg_id, or list of either
1001 The indices or msg_ids of indices to be retrieved
1000 The indices or msg_ids of indices to be retrieved
1002
1001
1003 block : bool
1002 block : bool
1004 Whether to wait for the result to be done
1003 Whether to wait for the result to be done
1005
1004
1006 Returns
1005 Returns
1007 -------
1006 -------
1008
1007
1009 AsyncResult
1008 AsyncResult
1010 A single AsyncResult object will always be returned.
1009 A single AsyncResult object will always be returned.
1011
1010
1012 AsyncHubResult
1011 AsyncHubResult
1013 A subclass of AsyncResult that retrieves results from the Hub
1012 A subclass of AsyncResult that retrieves results from the Hub
1014
1013
1015 """
1014 """
1016 block = self.block if block is None else block
1015 block = self.block if block is None else block
1017 if indices_or_msg_ids is None:
1016 if indices_or_msg_ids is None:
1018 indices_or_msg_ids = -1
1017 indices_or_msg_ids = -1
1019
1018
1020 if not isinstance(indices_or_msg_ids, (list,tuple)):
1019 if not isinstance(indices_or_msg_ids, (list,tuple)):
1021 indices_or_msg_ids = [indices_or_msg_ids]
1020 indices_or_msg_ids = [indices_or_msg_ids]
1022
1021
1023 theids = []
1022 theids = []
1024 for id in indices_or_msg_ids:
1023 for id in indices_or_msg_ids:
1025 if isinstance(id, int):
1024 if isinstance(id, int):
1026 id = self.history[id]
1025 id = self.history[id]
1027 if not isinstance(id, str):
1026 if not isinstance(id, str):
1028 raise TypeError("indices must be str or int, not %r"%id)
1027 raise TypeError("indices must be str or int, not %r"%id)
1029 theids.append(id)
1028 theids.append(id)
1030
1029
1031 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1030 local_ids = filter(lambda msg_id: msg_id in self.history or msg_id in self.results, theids)
1032 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1031 remote_ids = filter(lambda msg_id: msg_id not in local_ids, theids)
1033
1032
1034 if remote_ids:
1033 if remote_ids:
1035 ar = AsyncHubResult(self, msg_ids=theids)
1034 ar = AsyncHubResult(self, msg_ids=theids)
1036 else:
1035 else:
1037 ar = AsyncResult(self, msg_ids=theids)
1036 ar = AsyncResult(self, msg_ids=theids)
1038
1037
1039 if block:
1038 if block:
1040 ar.wait()
1039 ar.wait()
1041
1040
1042 return ar
1041 return ar
1043
1042
1044 @spin_first
1043 @spin_first
1045 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1044 def resubmit(self, indices_or_msg_ids=None, subheader=None, block=None):
1046 """Resubmit one or more tasks.
1045 """Resubmit one or more tasks.
1047
1046
1048 in-flight tasks may not be resubmitted.
1047 in-flight tasks may not be resubmitted.
1049
1048
1050 Parameters
1049 Parameters
1051 ----------
1050 ----------
1052
1051
1053 indices_or_msg_ids : integer history index, str msg_id, or list of either
1052 indices_or_msg_ids : integer history index, str msg_id, or list of either
1054 The indices or msg_ids of indices to be retrieved
1053 The indices or msg_ids of indices to be retrieved
1055
1054
1056 block : bool
1055 block : bool
1057 Whether to wait for the result to be done
1056 Whether to wait for the result to be done
1058
1057
1059 Returns
1058 Returns
1060 -------
1059 -------
1061
1060
1062 AsyncHubResult
1061 AsyncHubResult
1063 A subclass of AsyncResult that retrieves results from the Hub
1062 A subclass of AsyncResult that retrieves results from the Hub
1064
1063
1065 """
1064 """
1066 block = self.block if block is None else block
1065 block = self.block if block is None else block
1067 if indices_or_msg_ids is None:
1066 if indices_or_msg_ids is None:
1068 indices_or_msg_ids = -1
1067 indices_or_msg_ids = -1
1069
1068
1070 if not isinstance(indices_or_msg_ids, (list,tuple)):
1069 if not isinstance(indices_or_msg_ids, (list,tuple)):
1071 indices_or_msg_ids = [indices_or_msg_ids]
1070 indices_or_msg_ids = [indices_or_msg_ids]
1072
1071
1073 theids = []
1072 theids = []
1074 for id in indices_or_msg_ids:
1073 for id in indices_or_msg_ids:
1075 if isinstance(id, int):
1074 if isinstance(id, int):
1076 id = self.history[id]
1075 id = self.history[id]
1077 if not isinstance(id, str):
1076 if not isinstance(id, str):
1078 raise TypeError("indices must be str or int, not %r"%id)
1077 raise TypeError("indices must be str or int, not %r"%id)
1079 theids.append(id)
1078 theids.append(id)
1080
1079
1081 for msg_id in theids:
1080 for msg_id in theids:
1082 self.outstanding.discard(msg_id)
1081 self.outstanding.discard(msg_id)
1083 if msg_id in self.history:
1082 if msg_id in self.history:
1084 self.history.remove(msg_id)
1083 self.history.remove(msg_id)
1085 self.results.pop(msg_id, None)
1084 self.results.pop(msg_id, None)
1086 self.metadata.pop(msg_id, None)
1085 self.metadata.pop(msg_id, None)
1087 content = dict(msg_ids = theids)
1086 content = dict(msg_ids = theids)
1088
1087
1089 self.session.send(self._query_socket, 'resubmit_request', content)
1088 self.session.send(self._query_socket, 'resubmit_request', content)
1090
1089
1091 zmq.select([self._query_socket], [], [])
1090 zmq.select([self._query_socket], [], [])
1092 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1091 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1093 if self.debug:
1092 if self.debug:
1094 pprint(msg)
1093 pprint(msg)
1095 content = msg['content']
1094 content = msg['content']
1096 if content['status'] != 'ok':
1095 if content['status'] != 'ok':
1097 raise self._unwrap_exception(content)
1096 raise self._unwrap_exception(content)
1098
1097
1099 ar = AsyncHubResult(self, msg_ids=theids)
1098 ar = AsyncHubResult(self, msg_ids=theids)
1100
1099
1101 if block:
1100 if block:
1102 ar.wait()
1101 ar.wait()
1103
1102
1104 return ar
1103 return ar
1105
1104
1106 @spin_first
1105 @spin_first
1107 def result_status(self, msg_ids, status_only=True):
1106 def result_status(self, msg_ids, status_only=True):
1108 """Check on the status of the result(s) of the apply request with `msg_ids`.
1107 """Check on the status of the result(s) of the apply request with `msg_ids`.
1109
1108
1110 If status_only is False, then the actual results will be retrieved, else
1109 If status_only is False, then the actual results will be retrieved, else
1111 only the status of the results will be checked.
1110 only the status of the results will be checked.
1112
1111
1113 Parameters
1112 Parameters
1114 ----------
1113 ----------
1115
1114
1116 msg_ids : list of msg_ids
1115 msg_ids : list of msg_ids
1117 if int:
1116 if int:
1118 Passed as index to self.history for convenience.
1117 Passed as index to self.history for convenience.
1119 status_only : bool (default: True)
1118 status_only : bool (default: True)
1120 if False:
1119 if False:
1121 Retrieve the actual results of completed tasks.
1120 Retrieve the actual results of completed tasks.
1122
1121
1123 Returns
1122 Returns
1124 -------
1123 -------
1125
1124
1126 results : dict
1125 results : dict
1127 There will always be the keys 'pending' and 'completed', which will
1126 There will always be the keys 'pending' and 'completed', which will
1128 be lists of msg_ids that are incomplete or complete. If `status_only`
1127 be lists of msg_ids that are incomplete or complete. If `status_only`
1129 is False, then completed results will be keyed by their `msg_id`.
1128 is False, then completed results will be keyed by their `msg_id`.
1130 """
1129 """
1131 if not isinstance(msg_ids, (list,tuple)):
1130 if not isinstance(msg_ids, (list,tuple)):
1132 msg_ids = [msg_ids]
1131 msg_ids = [msg_ids]
1133
1132
1134 theids = []
1133 theids = []
1135 for msg_id in msg_ids:
1134 for msg_id in msg_ids:
1136 if isinstance(msg_id, int):
1135 if isinstance(msg_id, int):
1137 msg_id = self.history[msg_id]
1136 msg_id = self.history[msg_id]
1138 if not isinstance(msg_id, basestring):
1137 if not isinstance(msg_id, basestring):
1139 raise TypeError("msg_ids must be str, not %r"%msg_id)
1138 raise TypeError("msg_ids must be str, not %r"%msg_id)
1140 theids.append(msg_id)
1139 theids.append(msg_id)
1141
1140
1142 completed = []
1141 completed = []
1143 local_results = {}
1142 local_results = {}
1144
1143
1145 # comment this block out to temporarily disable local shortcut:
1144 # comment this block out to temporarily disable local shortcut:
1146 for msg_id in theids:
1145 for msg_id in theids:
1147 if msg_id in self.results:
1146 if msg_id in self.results:
1148 completed.append(msg_id)
1147 completed.append(msg_id)
1149 local_results[msg_id] = self.results[msg_id]
1148 local_results[msg_id] = self.results[msg_id]
1150 theids.remove(msg_id)
1149 theids.remove(msg_id)
1151
1150
1152 if theids: # some not locally cached
1151 if theids: # some not locally cached
1153 content = dict(msg_ids=theids, status_only=status_only)
1152 content = dict(msg_ids=theids, status_only=status_only)
1154 msg = self.session.send(self._query_socket, "result_request", content=content)
1153 msg = self.session.send(self._query_socket, "result_request", content=content)
1155 zmq.select([self._query_socket], [], [])
1154 zmq.select([self._query_socket], [], [])
1156 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1155 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1157 if self.debug:
1156 if self.debug:
1158 pprint(msg)
1157 pprint(msg)
1159 content = msg['content']
1158 content = msg['content']
1160 if content['status'] != 'ok':
1159 if content['status'] != 'ok':
1161 raise self._unwrap_exception(content)
1160 raise self._unwrap_exception(content)
1162 buffers = msg['buffers']
1161 buffers = msg['buffers']
1163 else:
1162 else:
1164 content = dict(completed=[],pending=[])
1163 content = dict(completed=[],pending=[])
1165
1164
1166 content['completed'].extend(completed)
1165 content['completed'].extend(completed)
1167
1166
1168 if status_only:
1167 if status_only:
1169 return content
1168 return content
1170
1169
1171 failures = []
1170 failures = []
1172 # load cached results into result:
1171 # load cached results into result:
1173 content.update(local_results)
1172 content.update(local_results)
1174 content = extract_dates(content)
1173
1175 # update cache with results:
1174 # update cache with results:
1176 for msg_id in sorted(theids):
1175 for msg_id in sorted(theids):
1177 if msg_id in content['completed']:
1176 if msg_id in content['completed']:
1178 rec = content[msg_id]
1177 rec = content[msg_id]
1179 parent = rec['header']
1178 parent = rec['header']
1180 header = rec['result_header']
1179 header = rec['result_header']
1181 rcontent = rec['result_content']
1180 rcontent = rec['result_content']
1182 iodict = rec['io']
1181 iodict = rec['io']
1183 if isinstance(rcontent, str):
1182 if isinstance(rcontent, str):
1184 rcontent = self.session.unpack(rcontent)
1183 rcontent = self.session.unpack(rcontent)
1185
1184
1186 md = self.metadata[msg_id]
1185 md = self.metadata[msg_id]
1187 md.update(self._extract_metadata(header, parent, rcontent))
1186 md.update(self._extract_metadata(header, parent, rcontent))
1188 md.update(iodict)
1187 md.update(iodict)
1189
1188
1190 if rcontent['status'] == 'ok':
1189 if rcontent['status'] == 'ok':
1191 res,buffers = util.unserialize_object(buffers)
1190 res,buffers = util.unserialize_object(buffers)
1192 else:
1191 else:
1193 print rcontent
1192 print rcontent
1194 res = self._unwrap_exception(rcontent)
1193 res = self._unwrap_exception(rcontent)
1195 failures.append(res)
1194 failures.append(res)
1196
1195
1197 self.results[msg_id] = res
1196 self.results[msg_id] = res
1198 content[msg_id] = res
1197 content[msg_id] = res
1199
1198
1200 if len(theids) == 1 and failures:
1199 if len(theids) == 1 and failures:
1201 raise failures[0]
1200 raise failures[0]
1202
1201
1203 error.collect_exceptions(failures, "result_status")
1202 error.collect_exceptions(failures, "result_status")
1204 return content
1203 return content
1205
1204
1206 @spin_first
1205 @spin_first
1207 def queue_status(self, targets='all', verbose=False):
1206 def queue_status(self, targets='all', verbose=False):
1208 """Fetch the status of engine queues.
1207 """Fetch the status of engine queues.
1209
1208
1210 Parameters
1209 Parameters
1211 ----------
1210 ----------
1212
1211
1213 targets : int/str/list of ints/strs
1212 targets : int/str/list of ints/strs
1214 the engines whose states are to be queried.
1213 the engines whose states are to be queried.
1215 default : all
1214 default : all
1216 verbose : bool
1215 verbose : bool
1217 Whether to return lengths only, or lists of ids for each element
1216 Whether to return lengths only, or lists of ids for each element
1218 """
1217 """
1219 engine_ids = self._build_targets(targets)[1]
1218 engine_ids = self._build_targets(targets)[1]
1220 content = dict(targets=engine_ids, verbose=verbose)
1219 content = dict(targets=engine_ids, verbose=verbose)
1221 self.session.send(self._query_socket, "queue_request", content=content)
1220 self.session.send(self._query_socket, "queue_request", content=content)
1222 idents,msg = self.session.recv(self._query_socket, 0)
1221 idents,msg = self.session.recv(self._query_socket, 0)
1223 if self.debug:
1222 if self.debug:
1224 pprint(msg)
1223 pprint(msg)
1225 content = msg['content']
1224 content = msg['content']
1226 status = content.pop('status')
1225 status = content.pop('status')
1227 if status != 'ok':
1226 if status != 'ok':
1228 raise self._unwrap_exception(content)
1227 raise self._unwrap_exception(content)
1229 content = util.rekey(content)
1228 content = util.rekey(content)
1230 if isinstance(targets, int):
1229 if isinstance(targets, int):
1231 return content[targets]
1230 return content[targets]
1232 else:
1231 else:
1233 return content
1232 return content
1234
1233
1235 @spin_first
1234 @spin_first
1236 def purge_results(self, jobs=[], targets=[]):
1235 def purge_results(self, jobs=[], targets=[]):
1237 """Tell the Hub to forget results.
1236 """Tell the Hub to forget results.
1238
1237
1239 Individual results can be purged by msg_id, or the entire
1238 Individual results can be purged by msg_id, or the entire
1240 history of specific targets can be purged.
1239 history of specific targets can be purged.
1241
1240
1242 Parameters
1241 Parameters
1243 ----------
1242 ----------
1244
1243
1245 jobs : str or list of str or AsyncResult objects
1244 jobs : str or list of str or AsyncResult objects
1246 the msg_ids whose results should be forgotten.
1245 the msg_ids whose results should be forgotten.
1247 targets : int/str/list of ints/strs
1246 targets : int/str/list of ints/strs
1248 The targets, by uuid or int_id, whose entire history is to be purged.
1247 The targets, by uuid or int_id, whose entire history is to be purged.
1249 Use `targets='all'` to scrub everything from the Hub's memory.
1248 Use `targets='all'` to scrub everything from the Hub's memory.
1250
1249
1251 default : None
1250 default : None
1252 """
1251 """
1253 if not targets and not jobs:
1252 if not targets and not jobs:
1254 raise ValueError("Must specify at least one of `targets` and `jobs`")
1253 raise ValueError("Must specify at least one of `targets` and `jobs`")
1255 if targets:
1254 if targets:
1256 targets = self._build_targets(targets)[1]
1255 targets = self._build_targets(targets)[1]
1257
1256
1258 # construct msg_ids from jobs
1257 # construct msg_ids from jobs
1259 msg_ids = []
1258 msg_ids = []
1260 if isinstance(jobs, (basestring,AsyncResult)):
1259 if isinstance(jobs, (basestring,AsyncResult)):
1261 jobs = [jobs]
1260 jobs = [jobs]
1262 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1261 bad_ids = filter(lambda obj: not isinstance(obj, (basestring, AsyncResult)), jobs)
1263 if bad_ids:
1262 if bad_ids:
1264 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1263 raise TypeError("Invalid msg_id type %r, expected str or AsyncResult"%bad_ids[0])
1265 for j in jobs:
1264 for j in jobs:
1266 if isinstance(j, AsyncResult):
1265 if isinstance(j, AsyncResult):
1267 msg_ids.extend(j.msg_ids)
1266 msg_ids.extend(j.msg_ids)
1268 else:
1267 else:
1269 msg_ids.append(j)
1268 msg_ids.append(j)
1270
1269
1271 content = dict(targets=targets, msg_ids=msg_ids)
1270 content = dict(targets=targets, msg_ids=msg_ids)
1272 self.session.send(self._query_socket, "purge_request", content=content)
1271 self.session.send(self._query_socket, "purge_request", content=content)
1273 idents, msg = self.session.recv(self._query_socket, 0)
1272 idents, msg = self.session.recv(self._query_socket, 0)
1274 if self.debug:
1273 if self.debug:
1275 pprint(msg)
1274 pprint(msg)
1276 content = msg['content']
1275 content = msg['content']
1277 if content['status'] != 'ok':
1276 if content['status'] != 'ok':
1278 raise self._unwrap_exception(content)
1277 raise self._unwrap_exception(content)
1279
1278
1280 @spin_first
1279 @spin_first
1281 def hub_history(self):
1280 def hub_history(self):
1282 """Get the Hub's history
1281 """Get the Hub's history
1283
1282
1284 Just like the Client, the Hub has a history, which is a list of msg_ids.
1283 Just like the Client, the Hub has a history, which is a list of msg_ids.
1285 This will contain the history of all clients, and, depending on configuration,
1284 This will contain the history of all clients, and, depending on configuration,
1286 may contain history across multiple cluster sessions.
1285 may contain history across multiple cluster sessions.
1287
1286
1288 Any msg_id returned here is a valid argument to `get_result`.
1287 Any msg_id returned here is a valid argument to `get_result`.
1289
1288
1290 Returns
1289 Returns
1291 -------
1290 -------
1292
1291
1293 msg_ids : list of strs
1292 msg_ids : list of strs
1294 list of all msg_ids, ordered by task submission time.
1293 list of all msg_ids, ordered by task submission time.
1295 """
1294 """
1296
1295
1297 self.session.send(self._query_socket, "history_request", content={})
1296 self.session.send(self._query_socket, "history_request", content={})
1298 idents, msg = self.session.recv(self._query_socket, 0)
1297 idents, msg = self.session.recv(self._query_socket, 0)
1299
1298
1300 if self.debug:
1299 if self.debug:
1301 pprint(msg)
1300 pprint(msg)
1302 content = msg['content']
1301 content = msg['content']
1303 if content['status'] != 'ok':
1302 if content['status'] != 'ok':
1304 raise self._unwrap_exception(content)
1303 raise self._unwrap_exception(content)
1305 else:
1304 else:
1306 return content['history']
1305 return content['history']
1307
1306
1308 @spin_first
1307 @spin_first
1309 def db_query(self, query, keys=None):
1308 def db_query(self, query, keys=None):
1310 """Query the Hub's TaskRecord database
1309 """Query the Hub's TaskRecord database
1311
1310
1312 This will return a list of task record dicts that match `query`
1311 This will return a list of task record dicts that match `query`
1313
1312
1314 Parameters
1313 Parameters
1315 ----------
1314 ----------
1316
1315
1317 query : mongodb query dict
1316 query : mongodb query dict
1318 The search dict. See mongodb query docs for details.
1317 The search dict. See mongodb query docs for details.
1319 keys : list of strs [optional]
1318 keys : list of strs [optional]
1320 The subset of keys to be returned. The default is to fetch everything but buffers.
1319 The subset of keys to be returned. The default is to fetch everything but buffers.
1321 'msg_id' will *always* be included.
1320 'msg_id' will *always* be included.
1322 """
1321 """
1323 if isinstance(keys, basestring):
1322 if isinstance(keys, basestring):
1324 keys = [keys]
1323 keys = [keys]
1325 content = dict(query=query, keys=keys)
1324 content = dict(query=query, keys=keys)
1326 self.session.send(self._query_socket, "db_request", content=content)
1325 self.session.send(self._query_socket, "db_request", content=content)
1327 idents, msg = self.session.recv(self._query_socket, 0)
1326 idents, msg = self.session.recv(self._query_socket, 0)
1328 if self.debug:
1327 if self.debug:
1329 pprint(msg)
1328 pprint(msg)
1330 content = msg['content']
1329 content = msg['content']
1331 if content['status'] != 'ok':
1330 if content['status'] != 'ok':
1332 raise self._unwrap_exception(content)
1331 raise self._unwrap_exception(content)
1333
1332
1334 records = content['records']
1333 records = content['records']
1334
1335 buffer_lens = content['buffer_lens']
1335 buffer_lens = content['buffer_lens']
1336 result_buffer_lens = content['result_buffer_lens']
1336 result_buffer_lens = content['result_buffer_lens']
1337 buffers = msg['buffers']
1337 buffers = msg['buffers']
1338 has_bufs = buffer_lens is not None
1338 has_bufs = buffer_lens is not None
1339 has_rbufs = result_buffer_lens is not None
1339 has_rbufs = result_buffer_lens is not None
1340 for i,rec in enumerate(records):
1340 for i,rec in enumerate(records):
1341 # unpack timestamps
1342 rec = extract_dates(rec)
1343 # relink buffers
1341 # relink buffers
1344 if has_bufs:
1342 if has_bufs:
1345 blen = buffer_lens[i]
1343 blen = buffer_lens[i]
1346 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1344 rec['buffers'], buffers = buffers[:blen],buffers[blen:]
1347 if has_rbufs:
1345 if has_rbufs:
1348 blen = result_buffer_lens[i]
1346 blen = result_buffer_lens[i]
1349 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1347 rec['result_buffers'], buffers = buffers[:blen],buffers[blen:]
1350
1348
1351 return records
1349 return records
1352
1350
1353 __all__ = [ 'Client' ]
1351 __all__ = [ 'Client' ]
@@ -1,1277 +1,1271 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 import time
19 import time
20 from datetime import datetime
20 from datetime import datetime
21
21
22 import zmq
22 import zmq
23 from zmq.eventloop import ioloop
23 from zmq.eventloop import ioloop
24 from zmq.eventloop.zmqstream import ZMQStream
24 from zmq.eventloop.zmqstream import ZMQStream
25
25
26 # internal:
26 # internal:
27 from IPython.utils.importstring import import_item
27 from IPython.utils.importstring import import_item
28 from IPython.utils.traitlets import (
28 from IPython.utils.traitlets import (
29 HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, CStr
29 HasTraits, Instance, Int, Unicode, Dict, Set, Tuple, CStr
30 )
30 )
31 from IPython.utils.jsonutil import ISO8601, extract_dates
32
31
33 from IPython.parallel import error, util
32 from IPython.parallel import error, util
34 from IPython.parallel.factory import RegistrationFactory
33 from IPython.parallel.factory import RegistrationFactory
35
34
36 from IPython.zmq.session import SessionFactory
35 from IPython.zmq.session import SessionFactory
37
36
38 from .heartmonitor import HeartMonitor
37 from .heartmonitor import HeartMonitor
39
38
40 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
41 # Code
40 # Code
42 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
43
42
44 def _passer(*args, **kwargs):
43 def _passer(*args, **kwargs):
45 return
44 return
46
45
47 def _printer(*args, **kwargs):
46 def _printer(*args, **kwargs):
48 print (args)
47 print (args)
49 print (kwargs)
48 print (kwargs)
50
49
51 def empty_record():
50 def empty_record():
52 """Return an empty dict with all record keys."""
51 """Return an empty dict with all record keys."""
53 return {
52 return {
54 'msg_id' : None,
53 'msg_id' : None,
55 'header' : None,
54 'header' : None,
56 'content': None,
55 'content': None,
57 'buffers': None,
56 'buffers': None,
58 'submitted': None,
57 'submitted': None,
59 'client_uuid' : None,
58 'client_uuid' : None,
60 'engine_uuid' : None,
59 'engine_uuid' : None,
61 'started': None,
60 'started': None,
62 'completed': None,
61 'completed': None,
63 'resubmitted': None,
62 'resubmitted': None,
64 'result_header' : None,
63 'result_header' : None,
65 'result_content' : None,
64 'result_content' : None,
66 'result_buffers' : None,
65 'result_buffers' : None,
67 'queue' : None,
66 'queue' : None,
68 'pyin' : None,
67 'pyin' : None,
69 'pyout': None,
68 'pyout': None,
70 'pyerr': None,
69 'pyerr': None,
71 'stdout': '',
70 'stdout': '',
72 'stderr': '',
71 'stderr': '',
73 }
72 }
74
73
75 def init_record(msg):
74 def init_record(msg):
76 """Initialize a TaskRecord based on a request."""
75 """Initialize a TaskRecord based on a request."""
77 header = extract_dates(msg['header'])
76 header = msg['header']
78 return {
77 return {
79 'msg_id' : header['msg_id'],
78 'msg_id' : header['msg_id'],
80 'header' : header,
79 'header' : header,
81 'content': msg['content'],
80 'content': msg['content'],
82 'buffers': msg['buffers'],
81 'buffers': msg['buffers'],
83 'submitted': header['date'],
82 'submitted': header['date'],
84 'client_uuid' : None,
83 'client_uuid' : None,
85 'engine_uuid' : None,
84 'engine_uuid' : None,
86 'started': None,
85 'started': None,
87 'completed': None,
86 'completed': None,
88 'resubmitted': None,
87 'resubmitted': None,
89 'result_header' : None,
88 'result_header' : None,
90 'result_content' : None,
89 'result_content' : None,
91 'result_buffers' : None,
90 'result_buffers' : None,
92 'queue' : None,
91 'queue' : None,
93 'pyin' : None,
92 'pyin' : None,
94 'pyout': None,
93 'pyout': None,
95 'pyerr': None,
94 'pyerr': None,
96 'stdout': '',
95 'stdout': '',
97 'stderr': '',
96 'stderr': '',
98 }
97 }
99
98
100
99
101 class EngineConnector(HasTraits):
100 class EngineConnector(HasTraits):
102 """A simple object for accessing the various zmq connections of an object.
101 """A simple object for accessing the various zmq connections of an object.
103 Attributes are:
102 Attributes are:
104 id (int): engine ID
103 id (int): engine ID
105 uuid (str): uuid (unused?)
104 uuid (str): uuid (unused?)
106 queue (str): identity of queue's XREQ socket
105 queue (str): identity of queue's XREQ socket
107 registration (str): identity of registration XREQ socket
106 registration (str): identity of registration XREQ socket
108 heartbeat (str): identity of heartbeat XREQ socket
107 heartbeat (str): identity of heartbeat XREQ socket
109 """
108 """
110 id=Int(0)
109 id=Int(0)
111 queue=CStr()
110 queue=CStr()
112 control=CStr()
111 control=CStr()
113 registration=CStr()
112 registration=CStr()
114 heartbeat=CStr()
113 heartbeat=CStr()
115 pending=Set()
114 pending=Set()
116
115
117 class HubFactory(RegistrationFactory):
116 class HubFactory(RegistrationFactory):
118 """The Configurable for setting up a Hub."""
117 """The Configurable for setting up a Hub."""
119
118
120 # port-pairs for monitoredqueues:
119 # port-pairs for monitoredqueues:
121 hb = Tuple(Int,Int,config=True,
120 hb = Tuple(Int,Int,config=True,
122 help="""XREQ/SUB Port pair for Engine heartbeats""")
121 help="""XREQ/SUB Port pair for Engine heartbeats""")
123 def _hb_default(self):
122 def _hb_default(self):
124 return tuple(util.select_random_ports(2))
123 return tuple(util.select_random_ports(2))
125
124
126 mux = Tuple(Int,Int,config=True,
125 mux = Tuple(Int,Int,config=True,
127 help="""Engine/Client Port pair for MUX queue""")
126 help="""Engine/Client Port pair for MUX queue""")
128
127
129 def _mux_default(self):
128 def _mux_default(self):
130 return tuple(util.select_random_ports(2))
129 return tuple(util.select_random_ports(2))
131
130
132 task = Tuple(Int,Int,config=True,
131 task = Tuple(Int,Int,config=True,
133 help="""Engine/Client Port pair for Task queue""")
132 help="""Engine/Client Port pair for Task queue""")
134 def _task_default(self):
133 def _task_default(self):
135 return tuple(util.select_random_ports(2))
134 return tuple(util.select_random_ports(2))
136
135
137 control = Tuple(Int,Int,config=True,
136 control = Tuple(Int,Int,config=True,
138 help="""Engine/Client Port pair for Control queue""")
137 help="""Engine/Client Port pair for Control queue""")
139
138
140 def _control_default(self):
139 def _control_default(self):
141 return tuple(util.select_random_ports(2))
140 return tuple(util.select_random_ports(2))
142
141
143 iopub = Tuple(Int,Int,config=True,
142 iopub = Tuple(Int,Int,config=True,
144 help="""Engine/Client Port pair for IOPub relay""")
143 help="""Engine/Client Port pair for IOPub relay""")
145
144
146 def _iopub_default(self):
145 def _iopub_default(self):
147 return tuple(util.select_random_ports(2))
146 return tuple(util.select_random_ports(2))
148
147
149 # single ports:
148 # single ports:
150 mon_port = Int(config=True,
149 mon_port = Int(config=True,
151 help="""Monitor (SUB) port for queue traffic""")
150 help="""Monitor (SUB) port for queue traffic""")
152
151
153 def _mon_port_default(self):
152 def _mon_port_default(self):
154 return util.select_random_ports(1)[0]
153 return util.select_random_ports(1)[0]
155
154
156 notifier_port = Int(config=True,
155 notifier_port = Int(config=True,
157 help="""PUB port for sending engine status notifications""")
156 help="""PUB port for sending engine status notifications""")
158
157
159 def _notifier_port_default(self):
158 def _notifier_port_default(self):
160 return util.select_random_ports(1)[0]
159 return util.select_random_ports(1)[0]
161
160
162 engine_ip = Unicode('127.0.0.1', config=True,
161 engine_ip = Unicode('127.0.0.1', config=True,
163 help="IP on which to listen for engine connections. [default: loopback]")
162 help="IP on which to listen for engine connections. [default: loopback]")
164 engine_transport = Unicode('tcp', config=True,
163 engine_transport = Unicode('tcp', config=True,
165 help="0MQ transport for engine connections. [default: tcp]")
164 help="0MQ transport for engine connections. [default: tcp]")
166
165
167 client_ip = Unicode('127.0.0.1', config=True,
166 client_ip = Unicode('127.0.0.1', config=True,
168 help="IP on which to listen for client connections. [default: loopback]")
167 help="IP on which to listen for client connections. [default: loopback]")
169 client_transport = Unicode('tcp', config=True,
168 client_transport = Unicode('tcp', config=True,
170 help="0MQ transport for client connections. [default : tcp]")
169 help="0MQ transport for client connections. [default : tcp]")
171
170
172 monitor_ip = Unicode('127.0.0.1', config=True,
171 monitor_ip = Unicode('127.0.0.1', config=True,
173 help="IP on which to listen for monitor messages. [default: loopback]")
172 help="IP on which to listen for monitor messages. [default: loopback]")
174 monitor_transport = Unicode('tcp', config=True,
173 monitor_transport = Unicode('tcp', config=True,
175 help="0MQ transport for monitor messages. [default : tcp]")
174 help="0MQ transport for monitor messages. [default : tcp]")
176
175
177 monitor_url = Unicode('')
176 monitor_url = Unicode('')
178
177
179 db_class = Unicode('IPython.parallel.controller.dictdb.DictDB', config=True,
178 db_class = Unicode('IPython.parallel.controller.dictdb.DictDB', config=True,
180 help="""The class to use for the DB backend""")
179 help="""The class to use for the DB backend""")
181
180
182 # not configurable
181 # not configurable
183 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
182 db = Instance('IPython.parallel.controller.dictdb.BaseDB')
184 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
183 heartmonitor = Instance('IPython.parallel.controller.heartmonitor.HeartMonitor')
185
184
186 def _ip_changed(self, name, old, new):
185 def _ip_changed(self, name, old, new):
187 self.engine_ip = new
186 self.engine_ip = new
188 self.client_ip = new
187 self.client_ip = new
189 self.monitor_ip = new
188 self.monitor_ip = new
190 self._update_monitor_url()
189 self._update_monitor_url()
191
190
192 def _update_monitor_url(self):
191 def _update_monitor_url(self):
193 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
192 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
194
193
195 def _transport_changed(self, name, old, new):
194 def _transport_changed(self, name, old, new):
196 self.engine_transport = new
195 self.engine_transport = new
197 self.client_transport = new
196 self.client_transport = new
198 self.monitor_transport = new
197 self.monitor_transport = new
199 self._update_monitor_url()
198 self._update_monitor_url()
200
199
201 def __init__(self, **kwargs):
200 def __init__(self, **kwargs):
202 super(HubFactory, self).__init__(**kwargs)
201 super(HubFactory, self).__init__(**kwargs)
203 self._update_monitor_url()
202 self._update_monitor_url()
204
203
205
204
206 def construct(self):
205 def construct(self):
207 self.init_hub()
206 self.init_hub()
208
207
209 def start(self):
208 def start(self):
210 self.heartmonitor.start()
209 self.heartmonitor.start()
211 self.log.info("Heartmonitor started")
210 self.log.info("Heartmonitor started")
212
211
213 def init_hub(self):
212 def init_hub(self):
214 """construct"""
213 """construct"""
215 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
214 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
216 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
215 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
217
216
218 ctx = self.context
217 ctx = self.context
219 loop = self.loop
218 loop = self.loop
220
219
221 # Registrar socket
220 # Registrar socket
222 q = ZMQStream(ctx.socket(zmq.XREP), loop)
221 q = ZMQStream(ctx.socket(zmq.XREP), loop)
223 q.bind(client_iface % self.regport)
222 q.bind(client_iface % self.regport)
224 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
223 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
225 if self.client_ip != self.engine_ip:
224 if self.client_ip != self.engine_ip:
226 q.bind(engine_iface % self.regport)
225 q.bind(engine_iface % self.regport)
227 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
226 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
228
227
229 ### Engine connections ###
228 ### Engine connections ###
230
229
231 # heartbeat
230 # heartbeat
232 hpub = ctx.socket(zmq.PUB)
231 hpub = ctx.socket(zmq.PUB)
233 hpub.bind(engine_iface % self.hb[0])
232 hpub.bind(engine_iface % self.hb[0])
234 hrep = ctx.socket(zmq.XREP)
233 hrep = ctx.socket(zmq.XREP)
235 hrep.bind(engine_iface % self.hb[1])
234 hrep.bind(engine_iface % self.hb[1])
236 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
235 self.heartmonitor = HeartMonitor(loop=loop, config=self.config, log=self.log,
237 pingstream=ZMQStream(hpub,loop),
236 pingstream=ZMQStream(hpub,loop),
238 pongstream=ZMQStream(hrep,loop)
237 pongstream=ZMQStream(hrep,loop)
239 )
238 )
240
239
241 ### Client connections ###
240 ### Client connections ###
242 # Notifier socket
241 # Notifier socket
243 n = ZMQStream(ctx.socket(zmq.PUB), loop)
242 n = ZMQStream(ctx.socket(zmq.PUB), loop)
244 n.bind(client_iface%self.notifier_port)
243 n.bind(client_iface%self.notifier_port)
245
244
246 ### build and launch the queues ###
245 ### build and launch the queues ###
247
246
248 # monitor socket
247 # monitor socket
249 sub = ctx.socket(zmq.SUB)
248 sub = ctx.socket(zmq.SUB)
250 sub.setsockopt(zmq.SUBSCRIBE, "")
249 sub.setsockopt(zmq.SUBSCRIBE, "")
251 sub.bind(self.monitor_url)
250 sub.bind(self.monitor_url)
252 sub.bind('inproc://monitor')
251 sub.bind('inproc://monitor')
253 sub = ZMQStream(sub, loop)
252 sub = ZMQStream(sub, loop)
254
253
255 # connect the db
254 # connect the db
256 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
255 self.log.info('Hub using DB backend: %r'%(self.db_class.split()[-1]))
257 # cdir = self.config.Global.cluster_dir
256 # cdir = self.config.Global.cluster_dir
258 self.db = import_item(str(self.db_class))(session=self.session.session, config=self.config)
257 self.db = import_item(str(self.db_class))(session=self.session.session,
258 config=self.config, log=self.log)
259 time.sleep(.25)
259 time.sleep(.25)
260 try:
260 try:
261 scheme = self.config.TaskScheduler.scheme_name
261 scheme = self.config.TaskScheduler.scheme_name
262 except AttributeError:
262 except AttributeError:
263 from .scheduler import TaskScheduler
263 from .scheduler import TaskScheduler
264 scheme = TaskScheduler.scheme_name.get_default_value()
264 scheme = TaskScheduler.scheme_name.get_default_value()
265 # build connection dicts
265 # build connection dicts
266 self.engine_info = {
266 self.engine_info = {
267 'control' : engine_iface%self.control[1],
267 'control' : engine_iface%self.control[1],
268 'mux': engine_iface%self.mux[1],
268 'mux': engine_iface%self.mux[1],
269 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
269 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
270 'task' : engine_iface%self.task[1],
270 'task' : engine_iface%self.task[1],
271 'iopub' : engine_iface%self.iopub[1],
271 'iopub' : engine_iface%self.iopub[1],
272 # 'monitor' : engine_iface%self.mon_port,
272 # 'monitor' : engine_iface%self.mon_port,
273 }
273 }
274
274
275 self.client_info = {
275 self.client_info = {
276 'control' : client_iface%self.control[0],
276 'control' : client_iface%self.control[0],
277 'mux': client_iface%self.mux[0],
277 'mux': client_iface%self.mux[0],
278 'task' : (scheme, client_iface%self.task[0]),
278 'task' : (scheme, client_iface%self.task[0]),
279 'iopub' : client_iface%self.iopub[0],
279 'iopub' : client_iface%self.iopub[0],
280 'notification': client_iface%self.notifier_port
280 'notification': client_iface%self.notifier_port
281 }
281 }
282 self.log.debug("Hub engine addrs: %s"%self.engine_info)
282 self.log.debug("Hub engine addrs: %s"%self.engine_info)
283 self.log.debug("Hub client addrs: %s"%self.client_info)
283 self.log.debug("Hub client addrs: %s"%self.client_info)
284
284
285 # resubmit stream
285 # resubmit stream
286 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
286 r = ZMQStream(ctx.socket(zmq.XREQ), loop)
287 url = util.disambiguate_url(self.client_info['task'][-1])
287 url = util.disambiguate_url(self.client_info['task'][-1])
288 r.setsockopt(zmq.IDENTITY, self.session.session)
288 r.setsockopt(zmq.IDENTITY, self.session.session)
289 r.connect(url)
289 r.connect(url)
290
290
291 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
291 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
292 query=q, notifier=n, resubmit=r, db=self.db,
292 query=q, notifier=n, resubmit=r, db=self.db,
293 engine_info=self.engine_info, client_info=self.client_info,
293 engine_info=self.engine_info, client_info=self.client_info,
294 log=self.log)
294 log=self.log)
295
295
296
296
297 class Hub(SessionFactory):
297 class Hub(SessionFactory):
298 """The IPython Controller Hub with 0MQ connections
298 """The IPython Controller Hub with 0MQ connections
299
299
300 Parameters
300 Parameters
301 ==========
301 ==========
302 loop: zmq IOLoop instance
302 loop: zmq IOLoop instance
303 session: Session object
303 session: Session object
304 <removed> context: zmq context for creating new connections (?)
304 <removed> context: zmq context for creating new connections (?)
305 queue: ZMQStream for monitoring the command queue (SUB)
305 queue: ZMQStream for monitoring the command queue (SUB)
306 query: ZMQStream for engine registration and client queries requests (XREP)
306 query: ZMQStream for engine registration and client queries requests (XREP)
307 heartbeat: HeartMonitor object checking the pulse of the engines
307 heartbeat: HeartMonitor object checking the pulse of the engines
308 notifier: ZMQStream for broadcasting engine registration changes (PUB)
308 notifier: ZMQStream for broadcasting engine registration changes (PUB)
309 db: connection to db for out of memory logging of commands
309 db: connection to db for out of memory logging of commands
310 NotImplemented
310 NotImplemented
311 engine_info: dict of zmq connection information for engines to connect
311 engine_info: dict of zmq connection information for engines to connect
312 to the queues.
312 to the queues.
313 client_info: dict of zmq connection information for engines to connect
313 client_info: dict of zmq connection information for engines to connect
314 to the queues.
314 to the queues.
315 """
315 """
316 # internal data structures:
316 # internal data structures:
317 ids=Set() # engine IDs
317 ids=Set() # engine IDs
318 keytable=Dict()
318 keytable=Dict()
319 by_ident=Dict()
319 by_ident=Dict()
320 engines=Dict()
320 engines=Dict()
321 clients=Dict()
321 clients=Dict()
322 hearts=Dict()
322 hearts=Dict()
323 pending=Set()
323 pending=Set()
324 queues=Dict() # pending msg_ids keyed by engine_id
324 queues=Dict() # pending msg_ids keyed by engine_id
325 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
325 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
326 completed=Dict() # completed msg_ids keyed by engine_id
326 completed=Dict() # completed msg_ids keyed by engine_id
327 all_completed=Set() # completed msg_ids keyed by engine_id
327 all_completed=Set() # completed msg_ids keyed by engine_id
328 dead_engines=Set() # completed msg_ids keyed by engine_id
328 dead_engines=Set() # completed msg_ids keyed by engine_id
329 unassigned=Set() # set of task msg_ds not yet assigned a destination
329 unassigned=Set() # set of task msg_ds not yet assigned a destination
330 incoming_registrations=Dict()
330 incoming_registrations=Dict()
331 registration_timeout=Int()
331 registration_timeout=Int()
332 _idcounter=Int(0)
332 _idcounter=Int(0)
333
333
334 # objects from constructor:
334 # objects from constructor:
335 query=Instance(ZMQStream)
335 query=Instance(ZMQStream)
336 monitor=Instance(ZMQStream)
336 monitor=Instance(ZMQStream)
337 notifier=Instance(ZMQStream)
337 notifier=Instance(ZMQStream)
338 resubmit=Instance(ZMQStream)
338 resubmit=Instance(ZMQStream)
339 heartmonitor=Instance(HeartMonitor)
339 heartmonitor=Instance(HeartMonitor)
340 db=Instance(object)
340 db=Instance(object)
341 client_info=Dict()
341 client_info=Dict()
342 engine_info=Dict()
342 engine_info=Dict()
343
343
344
344
345 def __init__(self, **kwargs):
345 def __init__(self, **kwargs):
346 """
346 """
347 # universal:
347 # universal:
348 loop: IOLoop for creating future connections
348 loop: IOLoop for creating future connections
349 session: streamsession for sending serialized data
349 session: streamsession for sending serialized data
350 # engine:
350 # engine:
351 queue: ZMQStream for monitoring queue messages
351 queue: ZMQStream for monitoring queue messages
352 query: ZMQStream for engine+client registration and client requests
352 query: ZMQStream for engine+client registration and client requests
353 heartbeat: HeartMonitor object for tracking engines
353 heartbeat: HeartMonitor object for tracking engines
354 # extra:
354 # extra:
355 db: ZMQStream for db connection (NotImplemented)
355 db: ZMQStream for db connection (NotImplemented)
356 engine_info: zmq address/protocol dict for engine connections
356 engine_info: zmq address/protocol dict for engine connections
357 client_info: zmq address/protocol dict for client connections
357 client_info: zmq address/protocol dict for client connections
358 """
358 """
359
359
360 super(Hub, self).__init__(**kwargs)
360 super(Hub, self).__init__(**kwargs)
361 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
361 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
362
362
363 # validate connection dicts:
363 # validate connection dicts:
364 for k,v in self.client_info.iteritems():
364 for k,v in self.client_info.iteritems():
365 if k == 'task':
365 if k == 'task':
366 util.validate_url_container(v[1])
366 util.validate_url_container(v[1])
367 else:
367 else:
368 util.validate_url_container(v)
368 util.validate_url_container(v)
369 # util.validate_url_container(self.client_info)
369 # util.validate_url_container(self.client_info)
370 util.validate_url_container(self.engine_info)
370 util.validate_url_container(self.engine_info)
371
371
372 # register our callbacks
372 # register our callbacks
373 self.query.on_recv(self.dispatch_query)
373 self.query.on_recv(self.dispatch_query)
374 self.monitor.on_recv(self.dispatch_monitor_traffic)
374 self.monitor.on_recv(self.dispatch_monitor_traffic)
375
375
376 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
376 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
377 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
377 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
378
378
379 self.monitor_handlers = { 'in' : self.save_queue_request,
379 self.monitor_handlers = { 'in' : self.save_queue_request,
380 'out': self.save_queue_result,
380 'out': self.save_queue_result,
381 'intask': self.save_task_request,
381 'intask': self.save_task_request,
382 'outtask': self.save_task_result,
382 'outtask': self.save_task_result,
383 'tracktask': self.save_task_destination,
383 'tracktask': self.save_task_destination,
384 'incontrol': _passer,
384 'incontrol': _passer,
385 'outcontrol': _passer,
385 'outcontrol': _passer,
386 'iopub': self.save_iopub_message,
386 'iopub': self.save_iopub_message,
387 }
387 }
388
388
389 self.query_handlers = {'queue_request': self.queue_status,
389 self.query_handlers = {'queue_request': self.queue_status,
390 'result_request': self.get_results,
390 'result_request': self.get_results,
391 'history_request': self.get_history,
391 'history_request': self.get_history,
392 'db_request': self.db_query,
392 'db_request': self.db_query,
393 'purge_request': self.purge_results,
393 'purge_request': self.purge_results,
394 'load_request': self.check_load,
394 'load_request': self.check_load,
395 'resubmit_request': self.resubmit_task,
395 'resubmit_request': self.resubmit_task,
396 'shutdown_request': self.shutdown_request,
396 'shutdown_request': self.shutdown_request,
397 'registration_request' : self.register_engine,
397 'registration_request' : self.register_engine,
398 'unregistration_request' : self.unregister_engine,
398 'unregistration_request' : self.unregister_engine,
399 'connection_request': self.connection_request,
399 'connection_request': self.connection_request,
400 }
400 }
401
401
402 # ignore resubmit replies
402 # ignore resubmit replies
403 self.resubmit.on_recv(lambda msg: None, copy=False)
403 self.resubmit.on_recv(lambda msg: None, copy=False)
404
404
405 self.log.info("hub::created hub")
405 self.log.info("hub::created hub")
406
406
407 @property
407 @property
408 def _next_id(self):
408 def _next_id(self):
409 """gemerate a new ID.
409 """gemerate a new ID.
410
410
411 No longer reuse old ids, just count from 0."""
411 No longer reuse old ids, just count from 0."""
412 newid = self._idcounter
412 newid = self._idcounter
413 self._idcounter += 1
413 self._idcounter += 1
414 return newid
414 return newid
415 # newid = 0
415 # newid = 0
416 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
416 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
417 # # print newid, self.ids, self.incoming_registrations
417 # # print newid, self.ids, self.incoming_registrations
418 # while newid in self.ids or newid in incoming:
418 # while newid in self.ids or newid in incoming:
419 # newid += 1
419 # newid += 1
420 # return newid
420 # return newid
421
421
422 #-----------------------------------------------------------------------------
422 #-----------------------------------------------------------------------------
423 # message validation
423 # message validation
424 #-----------------------------------------------------------------------------
424 #-----------------------------------------------------------------------------
425
425
426 def _validate_targets(self, targets):
426 def _validate_targets(self, targets):
427 """turn any valid targets argument into a list of integer ids"""
427 """turn any valid targets argument into a list of integer ids"""
428 if targets is None:
428 if targets is None:
429 # default to all
429 # default to all
430 targets = self.ids
430 targets = self.ids
431
431
432 if isinstance(targets, (int,str,unicode)):
432 if isinstance(targets, (int,str,unicode)):
433 # only one target specified
433 # only one target specified
434 targets = [targets]
434 targets = [targets]
435 _targets = []
435 _targets = []
436 for t in targets:
436 for t in targets:
437 # map raw identities to ids
437 # map raw identities to ids
438 if isinstance(t, (str,unicode)):
438 if isinstance(t, (str,unicode)):
439 t = self.by_ident.get(t, t)
439 t = self.by_ident.get(t, t)
440 _targets.append(t)
440 _targets.append(t)
441 targets = _targets
441 targets = _targets
442 bad_targets = [ t for t in targets if t not in self.ids ]
442 bad_targets = [ t for t in targets if t not in self.ids ]
443 if bad_targets:
443 if bad_targets:
444 raise IndexError("No Such Engine: %r"%bad_targets)
444 raise IndexError("No Such Engine: %r"%bad_targets)
445 if not targets:
445 if not targets:
446 raise IndexError("No Engines Registered")
446 raise IndexError("No Engines Registered")
447 return targets
447 return targets
448
448
449 #-----------------------------------------------------------------------------
449 #-----------------------------------------------------------------------------
450 # dispatch methods (1 per stream)
450 # dispatch methods (1 per stream)
451 #-----------------------------------------------------------------------------
451 #-----------------------------------------------------------------------------
452
452
453
453
454 def dispatch_monitor_traffic(self, msg):
454 def dispatch_monitor_traffic(self, msg):
455 """all ME and Task queue messages come through here, as well as
455 """all ME and Task queue messages come through here, as well as
456 IOPub traffic."""
456 IOPub traffic."""
457 self.log.debug("monitor traffic: %r"%msg[:2])
457 self.log.debug("monitor traffic: %r"%msg[:2])
458 switch = msg[0]
458 switch = msg[0]
459 try:
459 try:
460 idents, msg = self.session.feed_identities(msg[1:])
460 idents, msg = self.session.feed_identities(msg[1:])
461 except ValueError:
461 except ValueError:
462 idents=[]
462 idents=[]
463 if not idents:
463 if not idents:
464 self.log.error("Bad Monitor Message: %r"%msg)
464 self.log.error("Bad Monitor Message: %r"%msg)
465 return
465 return
466 handler = self.monitor_handlers.get(switch, None)
466 handler = self.monitor_handlers.get(switch, None)
467 if handler is not None:
467 if handler is not None:
468 handler(idents, msg)
468 handler(idents, msg)
469 else:
469 else:
470 self.log.error("Invalid monitor topic: %r"%switch)
470 self.log.error("Invalid monitor topic: %r"%switch)
471
471
472
472
473 def dispatch_query(self, msg):
473 def dispatch_query(self, msg):
474 """Route registration requests and queries from clients."""
474 """Route registration requests and queries from clients."""
475 try:
475 try:
476 idents, msg = self.session.feed_identities(msg)
476 idents, msg = self.session.feed_identities(msg)
477 except ValueError:
477 except ValueError:
478 idents = []
478 idents = []
479 if not idents:
479 if not idents:
480 self.log.error("Bad Query Message: %r"%msg)
480 self.log.error("Bad Query Message: %r"%msg)
481 return
481 return
482 client_id = idents[0]
482 client_id = idents[0]
483 try:
483 try:
484 msg = self.session.unpack_message(msg, content=True)
484 msg = self.session.unpack_message(msg, content=True)
485 except Exception:
485 except Exception:
486 content = error.wrap_exception()
486 content = error.wrap_exception()
487 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
487 self.log.error("Bad Query Message: %r"%msg, exc_info=True)
488 self.session.send(self.query, "hub_error", ident=client_id,
488 self.session.send(self.query, "hub_error", ident=client_id,
489 content=content)
489 content=content)
490 return
490 return
491 print( idents, msg)
492 # print client_id, header, parent, content
491 # print client_id, header, parent, content
493 #switch on message type:
492 #switch on message type:
494 msg_type = msg['msg_type']
493 msg_type = msg['msg_type']
495 self.log.info("client::client %r requested %r"%(client_id, msg_type))
494 self.log.info("client::client %r requested %r"%(client_id, msg_type))
496 handler = self.query_handlers.get(msg_type, None)
495 handler = self.query_handlers.get(msg_type, None)
497 try:
496 try:
498 assert handler is not None, "Bad Message Type: %r"%msg_type
497 assert handler is not None, "Bad Message Type: %r"%msg_type
499 except:
498 except:
500 content = error.wrap_exception()
499 content = error.wrap_exception()
501 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
500 self.log.error("Bad Message Type: %r"%msg_type, exc_info=True)
502 self.session.send(self.query, "hub_error", ident=client_id,
501 self.session.send(self.query, "hub_error", ident=client_id,
503 content=content)
502 content=content)
504 return
503 return
505
504
506 else:
505 else:
507 handler(idents, msg)
506 handler(idents, msg)
508
507
509 def dispatch_db(self, msg):
508 def dispatch_db(self, msg):
510 """"""
509 """"""
511 raise NotImplementedError
510 raise NotImplementedError
512
511
513 #---------------------------------------------------------------------------
512 #---------------------------------------------------------------------------
514 # handler methods (1 per event)
513 # handler methods (1 per event)
515 #---------------------------------------------------------------------------
514 #---------------------------------------------------------------------------
516
515
517 #----------------------- Heartbeat --------------------------------------
516 #----------------------- Heartbeat --------------------------------------
518
517
519 def handle_new_heart(self, heart):
518 def handle_new_heart(self, heart):
520 """handler to attach to heartbeater.
519 """handler to attach to heartbeater.
521 Called when a new heart starts to beat.
520 Called when a new heart starts to beat.
522 Triggers completion of registration."""
521 Triggers completion of registration."""
523 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
522 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
524 if heart not in self.incoming_registrations:
523 if heart not in self.incoming_registrations:
525 self.log.info("heartbeat::ignoring new heart: %r"%heart)
524 self.log.info("heartbeat::ignoring new heart: %r"%heart)
526 else:
525 else:
527 self.finish_registration(heart)
526 self.finish_registration(heart)
528
527
529
528
530 def handle_heart_failure(self, heart):
529 def handle_heart_failure(self, heart):
531 """handler to attach to heartbeater.
530 """handler to attach to heartbeater.
532 called when a previously registered heart fails to respond to beat request.
531 called when a previously registered heart fails to respond to beat request.
533 triggers unregistration"""
532 triggers unregistration"""
534 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
533 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
535 eid = self.hearts.get(heart, None)
534 eid = self.hearts.get(heart, None)
536 queue = self.engines[eid].queue
535 queue = self.engines[eid].queue
537 if eid is None:
536 if eid is None:
538 self.log.info("heartbeat::ignoring heart failure %r"%heart)
537 self.log.info("heartbeat::ignoring heart failure %r"%heart)
539 else:
538 else:
540 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
539 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
541
540
542 #----------------------- MUX Queue Traffic ------------------------------
541 #----------------------- MUX Queue Traffic ------------------------------
543
542
544 def save_queue_request(self, idents, msg):
543 def save_queue_request(self, idents, msg):
545 if len(idents) < 2:
544 if len(idents) < 2:
546 self.log.error("invalid identity prefix: %r"%idents)
545 self.log.error("invalid identity prefix: %r"%idents)
547 return
546 return
548 queue_id, client_id = idents[:2]
547 queue_id, client_id = idents[:2]
549 try:
548 try:
550 msg = self.session.unpack_message(msg, content=False)
549 msg = self.session.unpack_message(msg, content=False)
551 except Exception:
550 except Exception:
552 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
551 self.log.error("queue::client %r sent invalid message to %r: %r"%(client_id, queue_id, msg), exc_info=True)
553 return
552 return
554
553
555 eid = self.by_ident.get(queue_id, None)
554 eid = self.by_ident.get(queue_id, None)
556 if eid is None:
555 if eid is None:
557 self.log.error("queue::target %r not registered"%queue_id)
556 self.log.error("queue::target %r not registered"%queue_id)
558 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
557 self.log.debug("queue:: valid are: %r"%(self.by_ident.keys()))
559 return
558 return
560
561 header = msg['header']
562 msg_id = header['msg_id']
563 record = init_record(msg)
559 record = init_record(msg)
560 msg_id = record['msg_id']
564 record['engine_uuid'] = queue_id
561 record['engine_uuid'] = queue_id
565 record['client_uuid'] = client_id
562 record['client_uuid'] = client_id
566 record['queue'] = 'mux'
563 record['queue'] = 'mux'
567
564
568 try:
565 try:
569 # it's posible iopub arrived first:
566 # it's posible iopub arrived first:
570 existing = self.db.get_record(msg_id)
567 existing = self.db.get_record(msg_id)
571 for key,evalue in existing.iteritems():
568 for key,evalue in existing.iteritems():
572 rvalue = record.get(key, None)
569 rvalue = record.get(key, None)
573 if evalue and rvalue and evalue != rvalue:
570 if evalue and rvalue and evalue != rvalue:
574 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
571 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
575 elif evalue and not rvalue:
572 elif evalue and not rvalue:
576 record[key] = evalue
573 record[key] = evalue
577 self.db.update_record(msg_id, record)
574 self.db.update_record(msg_id, record)
578 except KeyError:
575 except KeyError:
579 self.db.add_record(msg_id, record)
576 self.db.add_record(msg_id, record)
580
577
581 self.pending.add(msg_id)
578 self.pending.add(msg_id)
582 self.queues[eid].append(msg_id)
579 self.queues[eid].append(msg_id)
583
580
584 def save_queue_result(self, idents, msg):
581 def save_queue_result(self, idents, msg):
585 if len(idents) < 2:
582 if len(idents) < 2:
586 self.log.error("invalid identity prefix: %r"%idents)
583 self.log.error("invalid identity prefix: %r"%idents)
587 return
584 return
588
585
589 client_id, queue_id = idents[:2]
586 client_id, queue_id = idents[:2]
590 try:
587 try:
591 msg = self.session.unpack_message(msg, content=False)
588 msg = self.session.unpack_message(msg, content=False)
592 except Exception:
589 except Exception:
593 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
590 self.log.error("queue::engine %r sent invalid message to %r: %r"%(
594 queue_id,client_id, msg), exc_info=True)
591 queue_id,client_id, msg), exc_info=True)
595 return
592 return
596
593
597 eid = self.by_ident.get(queue_id, None)
594 eid = self.by_ident.get(queue_id, None)
598 if eid is None:
595 if eid is None:
599 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
596 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
600 return
597 return
601
598
602 parent = msg['parent_header']
599 parent = msg['parent_header']
603 if not parent:
600 if not parent:
604 return
601 return
605 msg_id = parent['msg_id']
602 msg_id = parent['msg_id']
606 if msg_id in self.pending:
603 if msg_id in self.pending:
607 self.pending.remove(msg_id)
604 self.pending.remove(msg_id)
608 self.all_completed.add(msg_id)
605 self.all_completed.add(msg_id)
609 self.queues[eid].remove(msg_id)
606 self.queues[eid].remove(msg_id)
610 self.completed[eid].append(msg_id)
607 self.completed[eid].append(msg_id)
611 elif msg_id not in self.all_completed:
608 elif msg_id not in self.all_completed:
612 # it could be a result from a dead engine that died before delivering the
609 # it could be a result from a dead engine that died before delivering the
613 # result
610 # result
614 self.log.warn("queue:: unknown msg finished %r"%msg_id)
611 self.log.warn("queue:: unknown msg finished %r"%msg_id)
615 return
612 return
616 # update record anyway, because the unregistration could have been premature
613 # update record anyway, because the unregistration could have been premature
617 rheader = extract_dates(msg['header'])
614 rheader = msg['header']
618 completed = rheader['date']
615 completed = rheader['date']
619 started = rheader.get('started', None)
616 started = rheader.get('started', None)
620 result = {
617 result = {
621 'result_header' : rheader,
618 'result_header' : rheader,
622 'result_content': msg['content'],
619 'result_content': msg['content'],
623 'started' : started,
620 'started' : started,
624 'completed' : completed
621 'completed' : completed
625 }
622 }
626
623
627 result['result_buffers'] = msg['buffers']
624 result['result_buffers'] = msg['buffers']
628 try:
625 try:
629 self.db.update_record(msg_id, result)
626 self.db.update_record(msg_id, result)
630 except Exception:
627 except Exception:
631 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
628 self.log.error("DB Error updating record %r"%msg_id, exc_info=True)
632
629
633
630
634 #--------------------- Task Queue Traffic ------------------------------
631 #--------------------- Task Queue Traffic ------------------------------
635
632
636 def save_task_request(self, idents, msg):
633 def save_task_request(self, idents, msg):
637 """Save the submission of a task."""
634 """Save the submission of a task."""
638 client_id = idents[0]
635 client_id = idents[0]
639
636
640 try:
637 try:
641 msg = self.session.unpack_message(msg, content=False)
638 msg = self.session.unpack_message(msg, content=False)
642 except Exception:
639 except Exception:
643 self.log.error("task::client %r sent invalid task message: %r"%(
640 self.log.error("task::client %r sent invalid task message: %r"%(
644 client_id, msg), exc_info=True)
641 client_id, msg), exc_info=True)
645 return
642 return
646 record = init_record(msg)
643 record = init_record(msg)
647
644
648 record['client_uuid'] = client_id
645 record['client_uuid'] = client_id
649 record['queue'] = 'task'
646 record['queue'] = 'task'
650 header = msg['header']
647 header = msg['header']
651 msg_id = header['msg_id']
648 msg_id = header['msg_id']
652 self.pending.add(msg_id)
649 self.pending.add(msg_id)
653 self.unassigned.add(msg_id)
650 self.unassigned.add(msg_id)
654 try:
651 try:
655 # it's posible iopub arrived first:
652 # it's posible iopub arrived first:
656 existing = self.db.get_record(msg_id)
653 existing = self.db.get_record(msg_id)
657 if existing['resubmitted']:
654 if existing['resubmitted']:
658 for key in ('submitted', 'client_uuid', 'buffers'):
655 for key in ('submitted', 'client_uuid', 'buffers'):
659 # don't clobber these keys on resubmit
656 # don't clobber these keys on resubmit
660 # submitted and client_uuid should be different
657 # submitted and client_uuid should be different
661 # and buffers might be big, and shouldn't have changed
658 # and buffers might be big, and shouldn't have changed
662 record.pop(key)
659 record.pop(key)
663 # still check content,header which should not change
660 # still check content,header which should not change
664 # but are not expensive to compare as buffers
661 # but are not expensive to compare as buffers
665
662
666 for key,evalue in existing.iteritems():
663 for key,evalue in existing.iteritems():
667 if key.endswith('buffers'):
664 if key.endswith('buffers'):
668 # don't compare buffers
665 # don't compare buffers
669 continue
666 continue
670 rvalue = record.get(key, None)
667 rvalue = record.get(key, None)
671 if evalue and rvalue and evalue != rvalue:
668 if evalue and rvalue and evalue != rvalue:
672 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
669 self.log.warn("conflicting initial state for record: %r:%r <%r> %r"%(msg_id, rvalue, key, evalue))
673 elif evalue and not rvalue:
670 elif evalue and not rvalue:
674 record[key] = evalue
671 record[key] = evalue
675 self.db.update_record(msg_id, record)
672 self.db.update_record(msg_id, record)
676 except KeyError:
673 except KeyError:
677 self.db.add_record(msg_id, record)
674 self.db.add_record(msg_id, record)
678 except Exception:
675 except Exception:
679 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
676 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
680
677
681 def save_task_result(self, idents, msg):
678 def save_task_result(self, idents, msg):
682 """save the result of a completed task."""
679 """save the result of a completed task."""
683 client_id = idents[0]
680 client_id = idents[0]
684 try:
681 try:
685 msg = self.session.unpack_message(msg, content=False)
682 msg = self.session.unpack_message(msg, content=False)
686 except Exception:
683 except Exception:
687 self.log.error("task::invalid task result message send to %r: %r"%(
684 self.log.error("task::invalid task result message send to %r: %r"%(
688 client_id, msg), exc_info=True)
685 client_id, msg), exc_info=True)
689 return
686 return
690
687
691 parent = msg['parent_header']
688 parent = msg['parent_header']
692 if not parent:
689 if not parent:
693 # print msg
690 # print msg
694 self.log.warn("Task %r had no parent!"%msg)
691 self.log.warn("Task %r had no parent!"%msg)
695 return
692 return
696 msg_id = parent['msg_id']
693 msg_id = parent['msg_id']
697 if msg_id in self.unassigned:
694 if msg_id in self.unassigned:
698 self.unassigned.remove(msg_id)
695 self.unassigned.remove(msg_id)
699
696
700 header = extract_dates(msg['header'])
697 header = msg['header']
701 engine_uuid = header.get('engine', None)
698 engine_uuid = header.get('engine', None)
702 eid = self.by_ident.get(engine_uuid, None)
699 eid = self.by_ident.get(engine_uuid, None)
703
700
704 if msg_id in self.pending:
701 if msg_id in self.pending:
705 self.pending.remove(msg_id)
702 self.pending.remove(msg_id)
706 self.all_completed.add(msg_id)
703 self.all_completed.add(msg_id)
707 if eid is not None:
704 if eid is not None:
708 self.completed[eid].append(msg_id)
705 self.completed[eid].append(msg_id)
709 if msg_id in self.tasks[eid]:
706 if msg_id in self.tasks[eid]:
710 self.tasks[eid].remove(msg_id)
707 self.tasks[eid].remove(msg_id)
711 completed = header['date']
708 completed = header['date']
712 started = header.get('started', None)
709 started = header.get('started', None)
713 result = {
710 result = {
714 'result_header' : header,
711 'result_header' : header,
715 'result_content': msg['content'],
712 'result_content': msg['content'],
716 'started' : started,
713 'started' : started,
717 'completed' : completed,
714 'completed' : completed,
718 'engine_uuid': engine_uuid
715 'engine_uuid': engine_uuid
719 }
716 }
720
717
721 result['result_buffers'] = msg['buffers']
718 result['result_buffers'] = msg['buffers']
722 try:
719 try:
723 self.db.update_record(msg_id, result)
720 self.db.update_record(msg_id, result)
724 except Exception:
721 except Exception:
725 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
722 self.log.error("DB Error saving task request %r"%msg_id, exc_info=True)
726
723
727 else:
724 else:
728 self.log.debug("task::unknown task %r finished"%msg_id)
725 self.log.debug("task::unknown task %r finished"%msg_id)
729
726
730 def save_task_destination(self, idents, msg):
727 def save_task_destination(self, idents, msg):
731 try:
728 try:
732 msg = self.session.unpack_message(msg, content=True)
729 msg = self.session.unpack_message(msg, content=True)
733 except Exception:
730 except Exception:
734 self.log.error("task::invalid task tracking message", exc_info=True)
731 self.log.error("task::invalid task tracking message", exc_info=True)
735 return
732 return
736 content = msg['content']
733 content = msg['content']
737 # print (content)
734 # print (content)
738 msg_id = content['msg_id']
735 msg_id = content['msg_id']
739 engine_uuid = content['engine_id']
736 engine_uuid = content['engine_id']
740 eid = self.by_ident[engine_uuid]
737 eid = self.by_ident[engine_uuid]
741
738
742 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
739 self.log.info("task::task %r arrived on %r"%(msg_id, eid))
743 if msg_id in self.unassigned:
740 if msg_id in self.unassigned:
744 self.unassigned.remove(msg_id)
741 self.unassigned.remove(msg_id)
745 # else:
742 # else:
746 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
743 # self.log.debug("task::task %r not listed as MIA?!"%(msg_id))
747
744
748 self.tasks[eid].append(msg_id)
745 self.tasks[eid].append(msg_id)
749 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
746 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
750 try:
747 try:
751 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
748 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
752 except Exception:
749 except Exception:
753 self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
750 self.log.error("DB Error saving task destination %r"%msg_id, exc_info=True)
754
751
755
752
756 def mia_task_request(self, idents, msg):
753 def mia_task_request(self, idents, msg):
757 raise NotImplementedError
754 raise NotImplementedError
758 client_id = idents[0]
755 client_id = idents[0]
759 # content = dict(mia=self.mia,status='ok')
756 # content = dict(mia=self.mia,status='ok')
760 # self.session.send('mia_reply', content=content, idents=client_id)
757 # self.session.send('mia_reply', content=content, idents=client_id)
761
758
762
759
763 #--------------------- IOPub Traffic ------------------------------
760 #--------------------- IOPub Traffic ------------------------------
764
761
765 def save_iopub_message(self, topics, msg):
762 def save_iopub_message(self, topics, msg):
766 """save an iopub message into the db"""
763 """save an iopub message into the db"""
767 # print (topics)
764 # print (topics)
768 try:
765 try:
769 msg = self.session.unpack_message(msg, content=True)
766 msg = self.session.unpack_message(msg, content=True)
770 except Exception:
767 except Exception:
771 self.log.error("iopub::invalid IOPub message", exc_info=True)
768 self.log.error("iopub::invalid IOPub message", exc_info=True)
772 return
769 return
773
770
774 parent = msg['parent_header']
771 parent = msg['parent_header']
775 if not parent:
772 if not parent:
776 self.log.error("iopub::invalid IOPub message: %r"%msg)
773 self.log.error("iopub::invalid IOPub message: %r"%msg)
777 return
774 return
778 msg_id = parent['msg_id']
775 msg_id = parent['msg_id']
779 msg_type = msg['msg_type']
776 msg_type = msg['msg_type']
780 content = msg['content']
777 content = msg['content']
781
778
782 # ensure msg_id is in db
779 # ensure msg_id is in db
783 try:
780 try:
784 rec = self.db.get_record(msg_id)
781 rec = self.db.get_record(msg_id)
785 except KeyError:
782 except KeyError:
786 rec = empty_record()
783 rec = empty_record()
787 rec['msg_id'] = msg_id
784 rec['msg_id'] = msg_id
788 self.db.add_record(msg_id, rec)
785 self.db.add_record(msg_id, rec)
789 # stream
786 # stream
790 d = {}
787 d = {}
791 if msg_type == 'stream':
788 if msg_type == 'stream':
792 name = content['name']
789 name = content['name']
793 s = rec[name] or ''
790 s = rec[name] or ''
794 d[name] = s + content['data']
791 d[name] = s + content['data']
795
792
796 elif msg_type == 'pyerr':
793 elif msg_type == 'pyerr':
797 d['pyerr'] = content
794 d['pyerr'] = content
798 elif msg_type == 'pyin':
795 elif msg_type == 'pyin':
799 d['pyin'] = content['code']
796 d['pyin'] = content['code']
800 else:
797 else:
801 d[msg_type] = content.get('data', '')
798 d[msg_type] = content.get('data', '')
802
799
803 try:
800 try:
804 self.db.update_record(msg_id, d)
801 self.db.update_record(msg_id, d)
805 except Exception:
802 except Exception:
806 self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
803 self.log.error("DB Error saving iopub message %r"%msg_id, exc_info=True)
807
804
808
805
809
806
810 #-------------------------------------------------------------------------
807 #-------------------------------------------------------------------------
811 # Registration requests
808 # Registration requests
812 #-------------------------------------------------------------------------
809 #-------------------------------------------------------------------------
813
810
814 def connection_request(self, client_id, msg):
811 def connection_request(self, client_id, msg):
815 """Reply with connection addresses for clients."""
812 """Reply with connection addresses for clients."""
816 self.log.info("client::client %r connected"%client_id)
813 self.log.info("client::client %r connected"%client_id)
817 content = dict(status='ok')
814 content = dict(status='ok')
818 content.update(self.client_info)
815 content.update(self.client_info)
819 jsonable = {}
816 jsonable = {}
820 for k,v in self.keytable.iteritems():
817 for k,v in self.keytable.iteritems():
821 if v not in self.dead_engines:
818 if v not in self.dead_engines:
822 jsonable[str(k)] = v
819 jsonable[str(k)] = v
823 content['engines'] = jsonable
820 content['engines'] = jsonable
824 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
821 self.session.send(self.query, 'connection_reply', content, parent=msg, ident=client_id)
825
822
826 def register_engine(self, reg, msg):
823 def register_engine(self, reg, msg):
827 """Register a new engine."""
824 """Register a new engine."""
828 content = msg['content']
825 content = msg['content']
829 try:
826 try:
830 queue = content['queue']
827 queue = content['queue']
831 except KeyError:
828 except KeyError:
832 self.log.error("registration::queue not specified", exc_info=True)
829 self.log.error("registration::queue not specified", exc_info=True)
833 return
830 return
834 heart = content.get('heartbeat', None)
831 heart = content.get('heartbeat', None)
835 """register a new engine, and create the socket(s) necessary"""
832 """register a new engine, and create the socket(s) necessary"""
836 eid = self._next_id
833 eid = self._next_id
837 # print (eid, queue, reg, heart)
834 # print (eid, queue, reg, heart)
838
835
839 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
836 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
840
837
841 content = dict(id=eid,status='ok')
838 content = dict(id=eid,status='ok')
842 content.update(self.engine_info)
839 content.update(self.engine_info)
843 # check if requesting available IDs:
840 # check if requesting available IDs:
844 if queue in self.by_ident:
841 if queue in self.by_ident:
845 try:
842 try:
846 raise KeyError("queue_id %r in use"%queue)
843 raise KeyError("queue_id %r in use"%queue)
847 except:
844 except:
848 content = error.wrap_exception()
845 content = error.wrap_exception()
849 self.log.error("queue_id %r in use"%queue, exc_info=True)
846 self.log.error("queue_id %r in use"%queue, exc_info=True)
850 elif heart in self.hearts: # need to check unique hearts?
847 elif heart in self.hearts: # need to check unique hearts?
851 try:
848 try:
852 raise KeyError("heart_id %r in use"%heart)
849 raise KeyError("heart_id %r in use"%heart)
853 except:
850 except:
854 self.log.error("heart_id %r in use"%heart, exc_info=True)
851 self.log.error("heart_id %r in use"%heart, exc_info=True)
855 content = error.wrap_exception()
852 content = error.wrap_exception()
856 else:
853 else:
857 for h, pack in self.incoming_registrations.iteritems():
854 for h, pack in self.incoming_registrations.iteritems():
858 if heart == h:
855 if heart == h:
859 try:
856 try:
860 raise KeyError("heart_id %r in use"%heart)
857 raise KeyError("heart_id %r in use"%heart)
861 except:
858 except:
862 self.log.error("heart_id %r in use"%heart, exc_info=True)
859 self.log.error("heart_id %r in use"%heart, exc_info=True)
863 content = error.wrap_exception()
860 content = error.wrap_exception()
864 break
861 break
865 elif queue == pack[1]:
862 elif queue == pack[1]:
866 try:
863 try:
867 raise KeyError("queue_id %r in use"%queue)
864 raise KeyError("queue_id %r in use"%queue)
868 except:
865 except:
869 self.log.error("queue_id %r in use"%queue, exc_info=True)
866 self.log.error("queue_id %r in use"%queue, exc_info=True)
870 content = error.wrap_exception()
867 content = error.wrap_exception()
871 break
868 break
872
869
873 msg = self.session.send(self.query, "registration_reply",
870 msg = self.session.send(self.query, "registration_reply",
874 content=content,
871 content=content,
875 ident=reg)
872 ident=reg)
876
873
877 if content['status'] == 'ok':
874 if content['status'] == 'ok':
878 if heart in self.heartmonitor.hearts:
875 if heart in self.heartmonitor.hearts:
879 # already beating
876 # already beating
880 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
877 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
881 self.finish_registration(heart)
878 self.finish_registration(heart)
882 else:
879 else:
883 purge = lambda : self._purge_stalled_registration(heart)
880 purge = lambda : self._purge_stalled_registration(heart)
884 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
881 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
885 dc.start()
882 dc.start()
886 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
883 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
887 else:
884 else:
888 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
885 self.log.error("registration::registration %i failed: %r"%(eid, content['evalue']))
889 return eid
886 return eid
890
887
891 def unregister_engine(self, ident, msg):
888 def unregister_engine(self, ident, msg):
892 """Unregister an engine that explicitly requested to leave."""
889 """Unregister an engine that explicitly requested to leave."""
893 try:
890 try:
894 eid = msg['content']['id']
891 eid = msg['content']['id']
895 except:
892 except:
896 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
893 self.log.error("registration::bad engine id for unregistration: %r"%ident, exc_info=True)
897 return
894 return
898 self.log.info("registration::unregister_engine(%r)"%eid)
895 self.log.info("registration::unregister_engine(%r)"%eid)
899 # print (eid)
896 # print (eid)
900 uuid = self.keytable[eid]
897 uuid = self.keytable[eid]
901 content=dict(id=eid, queue=uuid)
898 content=dict(id=eid, queue=uuid)
902 self.dead_engines.add(uuid)
899 self.dead_engines.add(uuid)
903 # self.ids.remove(eid)
900 # self.ids.remove(eid)
904 # uuid = self.keytable.pop(eid)
901 # uuid = self.keytable.pop(eid)
905 #
902 #
906 # ec = self.engines.pop(eid)
903 # ec = self.engines.pop(eid)
907 # self.hearts.pop(ec.heartbeat)
904 # self.hearts.pop(ec.heartbeat)
908 # self.by_ident.pop(ec.queue)
905 # self.by_ident.pop(ec.queue)
909 # self.completed.pop(eid)
906 # self.completed.pop(eid)
910 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
907 handleit = lambda : self._handle_stranded_msgs(eid, uuid)
911 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
908 dc = ioloop.DelayedCallback(handleit, self.registration_timeout, self.loop)
912 dc.start()
909 dc.start()
913 ############## TODO: HANDLE IT ################
910 ############## TODO: HANDLE IT ################
914
911
915 if self.notifier:
912 if self.notifier:
916 self.session.send(self.notifier, "unregistration_notification", content=content)
913 self.session.send(self.notifier, "unregistration_notification", content=content)
917
914
918 def _handle_stranded_msgs(self, eid, uuid):
915 def _handle_stranded_msgs(self, eid, uuid):
919 """Handle messages known to be on an engine when the engine unregisters.
916 """Handle messages known to be on an engine when the engine unregisters.
920
917
921 It is possible that this will fire prematurely - that is, an engine will
918 It is possible that this will fire prematurely - that is, an engine will
922 go down after completing a result, and the client will be notified
919 go down after completing a result, and the client will be notified
923 that the result failed and later receive the actual result.
920 that the result failed and later receive the actual result.
924 """
921 """
925
922
926 outstanding = self.queues[eid]
923 outstanding = self.queues[eid]
927
924
928 for msg_id in outstanding:
925 for msg_id in outstanding:
929 self.pending.remove(msg_id)
926 self.pending.remove(msg_id)
930 self.all_completed.add(msg_id)
927 self.all_completed.add(msg_id)
931 try:
928 try:
932 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
929 raise error.EngineError("Engine %r died while running task %r"%(eid, msg_id))
933 except:
930 except:
934 content = error.wrap_exception()
931 content = error.wrap_exception()
935 # build a fake header:
932 # build a fake header:
936 header = {}
933 header = {}
937 header['engine'] = uuid
934 header['engine'] = uuid
938 header['date'] = datetime.now()
935 header['date'] = datetime.now()
939 rec = dict(result_content=content, result_header=header, result_buffers=[])
936 rec = dict(result_content=content, result_header=header, result_buffers=[])
940 rec['completed'] = header['date']
937 rec['completed'] = header['date']
941 rec['engine_uuid'] = uuid
938 rec['engine_uuid'] = uuid
942 try:
939 try:
943 self.db.update_record(msg_id, rec)
940 self.db.update_record(msg_id, rec)
944 except Exception:
941 except Exception:
945 self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
942 self.log.error("DB Error handling stranded msg %r"%msg_id, exc_info=True)
946
943
947
944
948 def finish_registration(self, heart):
945 def finish_registration(self, heart):
949 """Second half of engine registration, called after our HeartMonitor
946 """Second half of engine registration, called after our HeartMonitor
950 has received a beat from the Engine's Heart."""
947 has received a beat from the Engine's Heart."""
951 try:
948 try:
952 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
949 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
953 except KeyError:
950 except KeyError:
954 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
951 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
955 return
952 return
956 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
953 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
957 if purge is not None:
954 if purge is not None:
958 purge.stop()
955 purge.stop()
959 control = queue
956 control = queue
960 self.ids.add(eid)
957 self.ids.add(eid)
961 self.keytable[eid] = queue
958 self.keytable[eid] = queue
962 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
959 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
963 control=control, heartbeat=heart)
960 control=control, heartbeat=heart)
964 self.by_ident[queue] = eid
961 self.by_ident[queue] = eid
965 self.queues[eid] = list()
962 self.queues[eid] = list()
966 self.tasks[eid] = list()
963 self.tasks[eid] = list()
967 self.completed[eid] = list()
964 self.completed[eid] = list()
968 self.hearts[heart] = eid
965 self.hearts[heart] = eid
969 content = dict(id=eid, queue=self.engines[eid].queue)
966 content = dict(id=eid, queue=self.engines[eid].queue)
970 if self.notifier:
967 if self.notifier:
971 self.session.send(self.notifier, "registration_notification", content=content)
968 self.session.send(self.notifier, "registration_notification", content=content)
972 self.log.info("engine::Engine Connected: %i"%eid)
969 self.log.info("engine::Engine Connected: %i"%eid)
973
970
974 def _purge_stalled_registration(self, heart):
971 def _purge_stalled_registration(self, heart):
975 if heart in self.incoming_registrations:
972 if heart in self.incoming_registrations:
976 eid = self.incoming_registrations.pop(heart)[0]
973 eid = self.incoming_registrations.pop(heart)[0]
977 self.log.info("registration::purging stalled registration: %i"%eid)
974 self.log.info("registration::purging stalled registration: %i"%eid)
978 else:
975 else:
979 pass
976 pass
980
977
981 #-------------------------------------------------------------------------
978 #-------------------------------------------------------------------------
982 # Client Requests
979 # Client Requests
983 #-------------------------------------------------------------------------
980 #-------------------------------------------------------------------------
984
981
985 def shutdown_request(self, client_id, msg):
982 def shutdown_request(self, client_id, msg):
986 """handle shutdown request."""
983 """handle shutdown request."""
987 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
984 self.session.send(self.query, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
988 # also notify other clients of shutdown
985 # also notify other clients of shutdown
989 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
986 self.session.send(self.notifier, 'shutdown_notice', content={'status': 'ok'})
990 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
987 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
991 dc.start()
988 dc.start()
992
989
993 def _shutdown(self):
990 def _shutdown(self):
994 self.log.info("hub::hub shutting down.")
991 self.log.info("hub::hub shutting down.")
995 time.sleep(0.1)
992 time.sleep(0.1)
996 sys.exit(0)
993 sys.exit(0)
997
994
998
995
999 def check_load(self, client_id, msg):
996 def check_load(self, client_id, msg):
1000 content = msg['content']
997 content = msg['content']
1001 try:
998 try:
1002 targets = content['targets']
999 targets = content['targets']
1003 targets = self._validate_targets(targets)
1000 targets = self._validate_targets(targets)
1004 except:
1001 except:
1005 content = error.wrap_exception()
1002 content = error.wrap_exception()
1006 self.session.send(self.query, "hub_error",
1003 self.session.send(self.query, "hub_error",
1007 content=content, ident=client_id)
1004 content=content, ident=client_id)
1008 return
1005 return
1009
1006
1010 content = dict(status='ok')
1007 content = dict(status='ok')
1011 # loads = {}
1008 # loads = {}
1012 for t in targets:
1009 for t in targets:
1013 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1010 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
1014 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1011 self.session.send(self.query, "load_reply", content=content, ident=client_id)
1015
1012
1016
1013
1017 def queue_status(self, client_id, msg):
1014 def queue_status(self, client_id, msg):
1018 """Return the Queue status of one or more targets.
1015 """Return the Queue status of one or more targets.
1019 if verbose: return the msg_ids
1016 if verbose: return the msg_ids
1020 else: return len of each type.
1017 else: return len of each type.
1021 keys: queue (pending MUX jobs)
1018 keys: queue (pending MUX jobs)
1022 tasks (pending Task jobs)
1019 tasks (pending Task jobs)
1023 completed (finished jobs from both queues)"""
1020 completed (finished jobs from both queues)"""
1024 content = msg['content']
1021 content = msg['content']
1025 targets = content['targets']
1022 targets = content['targets']
1026 try:
1023 try:
1027 targets = self._validate_targets(targets)
1024 targets = self._validate_targets(targets)
1028 except:
1025 except:
1029 content = error.wrap_exception()
1026 content = error.wrap_exception()
1030 self.session.send(self.query, "hub_error",
1027 self.session.send(self.query, "hub_error",
1031 content=content, ident=client_id)
1028 content=content, ident=client_id)
1032 return
1029 return
1033 verbose = content.get('verbose', False)
1030 verbose = content.get('verbose', False)
1034 content = dict(status='ok')
1031 content = dict(status='ok')
1035 for t in targets:
1032 for t in targets:
1036 queue = self.queues[t]
1033 queue = self.queues[t]
1037 completed = self.completed[t]
1034 completed = self.completed[t]
1038 tasks = self.tasks[t]
1035 tasks = self.tasks[t]
1039 if not verbose:
1036 if not verbose:
1040 queue = len(queue)
1037 queue = len(queue)
1041 completed = len(completed)
1038 completed = len(completed)
1042 tasks = len(tasks)
1039 tasks = len(tasks)
1043 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1040 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
1044 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1041 content['unassigned'] = list(self.unassigned) if verbose else len(self.unassigned)
1045
1042
1046 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1043 self.session.send(self.query, "queue_reply", content=content, ident=client_id)
1047
1044
1048 def purge_results(self, client_id, msg):
1045 def purge_results(self, client_id, msg):
1049 """Purge results from memory. This method is more valuable before we move
1046 """Purge results from memory. This method is more valuable before we move
1050 to a DB based message storage mechanism."""
1047 to a DB based message storage mechanism."""
1051 content = msg['content']
1048 content = msg['content']
1052 msg_ids = content.get('msg_ids', [])
1049 msg_ids = content.get('msg_ids', [])
1053 reply = dict(status='ok')
1050 reply = dict(status='ok')
1054 if msg_ids == 'all':
1051 if msg_ids == 'all':
1055 try:
1052 try:
1056 self.db.drop_matching_records(dict(completed={'$ne':None}))
1053 self.db.drop_matching_records(dict(completed={'$ne':None}))
1057 except Exception:
1054 except Exception:
1058 reply = error.wrap_exception()
1055 reply = error.wrap_exception()
1059 else:
1056 else:
1060 pending = filter(lambda m: m in self.pending, msg_ids)
1057 pending = filter(lambda m: m in self.pending, msg_ids)
1061 if pending:
1058 if pending:
1062 try:
1059 try:
1063 raise IndexError("msg pending: %r"%pending[0])
1060 raise IndexError("msg pending: %r"%pending[0])
1064 except:
1061 except:
1065 reply = error.wrap_exception()
1062 reply = error.wrap_exception()
1066 else:
1063 else:
1067 try:
1064 try:
1068 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1065 self.db.drop_matching_records(dict(msg_id={'$in':msg_ids}))
1069 except Exception:
1066 except Exception:
1070 reply = error.wrap_exception()
1067 reply = error.wrap_exception()
1071
1068
1072 if reply['status'] == 'ok':
1069 if reply['status'] == 'ok':
1073 eids = content.get('engine_ids', [])
1070 eids = content.get('engine_ids', [])
1074 for eid in eids:
1071 for eid in eids:
1075 if eid not in self.engines:
1072 if eid not in self.engines:
1076 try:
1073 try:
1077 raise IndexError("No such engine: %i"%eid)
1074 raise IndexError("No such engine: %i"%eid)
1078 except:
1075 except:
1079 reply = error.wrap_exception()
1076 reply = error.wrap_exception()
1080 break
1077 break
1081 msg_ids = self.completed.pop(eid)
1078 msg_ids = self.completed.pop(eid)
1082 uid = self.engines[eid].queue
1079 uid = self.engines[eid].queue
1083 try:
1080 try:
1084 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1081 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1085 except Exception:
1082 except Exception:
1086 reply = error.wrap_exception()
1083 reply = error.wrap_exception()
1087 break
1084 break
1088
1085
1089 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1086 self.session.send(self.query, 'purge_reply', content=reply, ident=client_id)
1090
1087
1091 def resubmit_task(self, client_id, msg):
1088 def resubmit_task(self, client_id, msg):
1092 """Resubmit one or more tasks."""
1089 """Resubmit one or more tasks."""
1093 def finish(reply):
1090 def finish(reply):
1094 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1091 self.session.send(self.query, 'resubmit_reply', content=reply, ident=client_id)
1095
1092
1096 content = msg['content']
1093 content = msg['content']
1097 msg_ids = content['msg_ids']
1094 msg_ids = content['msg_ids']
1098 reply = dict(status='ok')
1095 reply = dict(status='ok')
1099 try:
1096 try:
1100 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1097 records = self.db.find_records({'msg_id' : {'$in' : msg_ids}}, keys=[
1101 'header', 'content', 'buffers'])
1098 'header', 'content', 'buffers'])
1102 except Exception:
1099 except Exception:
1103 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1100 self.log.error('db::db error finding tasks to resubmit', exc_info=True)
1104 return finish(error.wrap_exception())
1101 return finish(error.wrap_exception())
1105
1102
1106 # validate msg_ids
1103 # validate msg_ids
1107 found_ids = [ rec['msg_id'] for rec in records ]
1104 found_ids = [ rec['msg_id'] for rec in records ]
1108 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1105 invalid_ids = filter(lambda m: m in self.pending, found_ids)
1109 if len(records) > len(msg_ids):
1106 if len(records) > len(msg_ids):
1110 try:
1107 try:
1111 raise RuntimeError("DB appears to be in an inconsistent state."
1108 raise RuntimeError("DB appears to be in an inconsistent state."
1112 "More matching records were found than should exist")
1109 "More matching records were found than should exist")
1113 except Exception:
1110 except Exception:
1114 return finish(error.wrap_exception())
1111 return finish(error.wrap_exception())
1115 elif len(records) < len(msg_ids):
1112 elif len(records) < len(msg_ids):
1116 missing = [ m for m in msg_ids if m not in found_ids ]
1113 missing = [ m for m in msg_ids if m not in found_ids ]
1117 try:
1114 try:
1118 raise KeyError("No such msg(s): %r"%missing)
1115 raise KeyError("No such msg(s): %r"%missing)
1119 except KeyError:
1116 except KeyError:
1120 return finish(error.wrap_exception())
1117 return finish(error.wrap_exception())
1121 elif invalid_ids:
1118 elif invalid_ids:
1122 msg_id = invalid_ids[0]
1119 msg_id = invalid_ids[0]
1123 try:
1120 try:
1124 raise ValueError("Task %r appears to be inflight"%(msg_id))
1121 raise ValueError("Task %r appears to be inflight"%(msg_id))
1125 except Exception:
1122 except Exception:
1126 return finish(error.wrap_exception())
1123 return finish(error.wrap_exception())
1127
1124
1128 # clear the existing records
1125 # clear the existing records
1129 now = datetime.now()
1126 now = datetime.now()
1130 rec = empty_record()
1127 rec = empty_record()
1131 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1128 map(rec.pop, ['msg_id', 'header', 'content', 'buffers', 'submitted'])
1132 rec['resubmitted'] = now
1129 rec['resubmitted'] = now
1133 rec['queue'] = 'task'
1130 rec['queue'] = 'task'
1134 rec['client_uuid'] = client_id[0]
1131 rec['client_uuid'] = client_id[0]
1135 try:
1132 try:
1136 for msg_id in msg_ids:
1133 for msg_id in msg_ids:
1137 self.all_completed.discard(msg_id)
1134 self.all_completed.discard(msg_id)
1138 self.db.update_record(msg_id, rec)
1135 self.db.update_record(msg_id, rec)
1139 except Exception:
1136 except Exception:
1140 self.log.error('db::db error upating record', exc_info=True)
1137 self.log.error('db::db error upating record', exc_info=True)
1141 reply = error.wrap_exception()
1138 reply = error.wrap_exception()
1142 else:
1139 else:
1143 # send the messages
1140 # send the messages
1144 now_s = now.strftime(ISO8601)
1145 for rec in records:
1141 for rec in records:
1146 header = rec['header']
1142 header = rec['header']
1147 # include resubmitted in header to prevent digest collision
1143 # include resubmitted in header to prevent digest collision
1148 header['resubmitted'] = now_s
1144 header['resubmitted'] = now
1149 msg = self.session.msg(header['msg_type'])
1145 msg = self.session.msg(header['msg_type'])
1150 msg['content'] = rec['content']
1146 msg['content'] = rec['content']
1151 msg['header'] = header
1147 msg['header'] = header
1152 msg['msg_id'] = rec['msg_id']
1148 msg['msg_id'] = rec['msg_id']
1153 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1149 self.session.send(self.resubmit, msg, buffers=rec['buffers'])
1154
1150
1155 finish(dict(status='ok'))
1151 finish(dict(status='ok'))
1156
1152
1157
1153
1158 def _extract_record(self, rec):
1154 def _extract_record(self, rec):
1159 """decompose a TaskRecord dict into subsection of reply for get_result"""
1155 """decompose a TaskRecord dict into subsection of reply for get_result"""
1160 io_dict = {}
1156 io_dict = {}
1161 for key in 'pyin pyout pyerr stdout stderr'.split():
1157 for key in 'pyin pyout pyerr stdout stderr'.split():
1162 io_dict[key] = rec[key]
1158 io_dict[key] = rec[key]
1163 content = { 'result_content': rec['result_content'],
1159 content = { 'result_content': rec['result_content'],
1164 'header': rec['header'],
1160 'header': rec['header'],
1165 'result_header' : rec['result_header'],
1161 'result_header' : rec['result_header'],
1166 'io' : io_dict,
1162 'io' : io_dict,
1167 }
1163 }
1168 if rec['result_buffers']:
1164 if rec['result_buffers']:
1169 buffers = map(str, rec['result_buffers'])
1165 buffers = map(str, rec['result_buffers'])
1170 else:
1166 else:
1171 buffers = []
1167 buffers = []
1172
1168
1173 return content, buffers
1169 return content, buffers
1174
1170
1175 def get_results(self, client_id, msg):
1171 def get_results(self, client_id, msg):
1176 """Get the result of 1 or more messages."""
1172 """Get the result of 1 or more messages."""
1177 content = msg['content']
1173 content = msg['content']
1178 msg_ids = sorted(set(content['msg_ids']))
1174 msg_ids = sorted(set(content['msg_ids']))
1179 statusonly = content.get('status_only', False)
1175 statusonly = content.get('status_only', False)
1180 pending = []
1176 pending = []
1181 completed = []
1177 completed = []
1182 content = dict(status='ok')
1178 content = dict(status='ok')
1183 content['pending'] = pending
1179 content['pending'] = pending
1184 content['completed'] = completed
1180 content['completed'] = completed
1185 buffers = []
1181 buffers = []
1186 if not statusonly:
1182 if not statusonly:
1187 try:
1183 try:
1188 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1184 matches = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1189 # turn match list into dict, for faster lookup
1185 # turn match list into dict, for faster lookup
1190 records = {}
1186 records = {}
1191 for rec in matches:
1187 for rec in matches:
1192 records[rec['msg_id']] = rec
1188 records[rec['msg_id']] = rec
1193 except Exception:
1189 except Exception:
1194 content = error.wrap_exception()
1190 content = error.wrap_exception()
1195 self.session.send(self.query, "result_reply", content=content,
1191 self.session.send(self.query, "result_reply", content=content,
1196 parent=msg, ident=client_id)
1192 parent=msg, ident=client_id)
1197 return
1193 return
1198 else:
1194 else:
1199 records = {}
1195 records = {}
1200 for msg_id in msg_ids:
1196 for msg_id in msg_ids:
1201 if msg_id in self.pending:
1197 if msg_id in self.pending:
1202 pending.append(msg_id)
1198 pending.append(msg_id)
1203 elif msg_id in self.all_completed:
1199 elif msg_id in self.all_completed:
1204 completed.append(msg_id)
1200 completed.append(msg_id)
1205 if not statusonly:
1201 if not statusonly:
1206 c,bufs = self._extract_record(records[msg_id])
1202 c,bufs = self._extract_record(records[msg_id])
1207 content[msg_id] = c
1203 content[msg_id] = c
1208 buffers.extend(bufs)
1204 buffers.extend(bufs)
1209 elif msg_id in records:
1205 elif msg_id in records:
1210 if rec['completed']:
1206 if rec['completed']:
1211 completed.append(msg_id)
1207 completed.append(msg_id)
1212 c,bufs = self._extract_record(records[msg_id])
1208 c,bufs = self._extract_record(records[msg_id])
1213 content[msg_id] = c
1209 content[msg_id] = c
1214 buffers.extend(bufs)
1210 buffers.extend(bufs)
1215 else:
1211 else:
1216 pending.append(msg_id)
1212 pending.append(msg_id)
1217 else:
1213 else:
1218 try:
1214 try:
1219 raise KeyError('No such message: '+msg_id)
1215 raise KeyError('No such message: '+msg_id)
1220 except:
1216 except:
1221 content = error.wrap_exception()
1217 content = error.wrap_exception()
1222 break
1218 break
1223 self.session.send(self.query, "result_reply", content=content,
1219 self.session.send(self.query, "result_reply", content=content,
1224 parent=msg, ident=client_id,
1220 parent=msg, ident=client_id,
1225 buffers=buffers)
1221 buffers=buffers)
1226
1222
1227 def get_history(self, client_id, msg):
1223 def get_history(self, client_id, msg):
1228 """Get a list of all msg_ids in our DB records"""
1224 """Get a list of all msg_ids in our DB records"""
1229 try:
1225 try:
1230 msg_ids = self.db.get_history()
1226 msg_ids = self.db.get_history()
1231 except Exception as e:
1227 except Exception as e:
1232 content = error.wrap_exception()
1228 content = error.wrap_exception()
1233 else:
1229 else:
1234 content = dict(status='ok', history=msg_ids)
1230 content = dict(status='ok', history=msg_ids)
1235
1231
1236 self.session.send(self.query, "history_reply", content=content,
1232 self.session.send(self.query, "history_reply", content=content,
1237 parent=msg, ident=client_id)
1233 parent=msg, ident=client_id)
1238
1234
1239 def db_query(self, client_id, msg):
1235 def db_query(self, client_id, msg):
1240 """Perform a raw query on the task record database."""
1236 """Perform a raw query on the task record database."""
1241 content = msg['content']
1237 content = msg['content']
1242 query = content.get('query', {})
1238 query = content.get('query', {})
1243 keys = content.get('keys', None)
1239 keys = content.get('keys', None)
1244 query = util.extract_dates(query)
1245 buffers = []
1240 buffers = []
1246 empty = list()
1241 empty = list()
1247
1248 try:
1242 try:
1249 records = self.db.find_records(query, keys)
1243 records = self.db.find_records(query, keys)
1250 except Exception as e:
1244 except Exception as e:
1251 content = error.wrap_exception()
1245 content = error.wrap_exception()
1252 else:
1246 else:
1253 # extract buffers from reply content:
1247 # extract buffers from reply content:
1254 if keys is not None:
1248 if keys is not None:
1255 buffer_lens = [] if 'buffers' in keys else None
1249 buffer_lens = [] if 'buffers' in keys else None
1256 result_buffer_lens = [] if 'result_buffers' in keys else None
1250 result_buffer_lens = [] if 'result_buffers' in keys else None
1257 else:
1251 else:
1258 buffer_lens = []
1252 buffer_lens = []
1259 result_buffer_lens = []
1253 result_buffer_lens = []
1260
1254
1261 for rec in records:
1255 for rec in records:
1262 # buffers may be None, so double check
1256 # buffers may be None, so double check
1263 if buffer_lens is not None:
1257 if buffer_lens is not None:
1264 b = rec.pop('buffers', empty) or empty
1258 b = rec.pop('buffers', empty) or empty
1265 buffer_lens.append(len(b))
1259 buffer_lens.append(len(b))
1266 buffers.extend(b)
1260 buffers.extend(b)
1267 if result_buffer_lens is not None:
1261 if result_buffer_lens is not None:
1268 rb = rec.pop('result_buffers', empty) or empty
1262 rb = rec.pop('result_buffers', empty) or empty
1269 result_buffer_lens.append(len(rb))
1263 result_buffer_lens.append(len(rb))
1270 buffers.extend(rb)
1264 buffers.extend(rb)
1271 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1265 content = dict(status='ok', records=records, buffer_lens=buffer_lens,
1272 result_buffer_lens=result_buffer_lens)
1266 result_buffer_lens=result_buffer_lens)
1273
1267
1274 self.session.send(self.query, "db_reply", content=content,
1268 self.session.send(self.query, "db_reply", content=content,
1275 parent=msg, ident=client_id,
1269 parent=msg, ident=client_id,
1276 buffers=buffers)
1270 buffers=buffers)
1277
1271
@@ -1,688 +1,688 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6 """
6 """
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2010-2011 The IPython Development Team
8 # Copyright (C) 2010-2011 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #----------------------------------------------------------------------
14 #----------------------------------------------------------------------
15 # Imports
15 # Imports
16 #----------------------------------------------------------------------
16 #----------------------------------------------------------------------
17
17
18 from __future__ import print_function
18 from __future__ import print_function
19
19
20 import logging
20 import logging
21 import sys
21 import sys
22
22
23 from datetime import datetime, timedelta
23 from datetime import datetime, timedelta
24 from random import randint, random
24 from random import randint, random
25 from types import FunctionType
25 from types import FunctionType
26
26
27 try:
27 try:
28 import numpy
28 import numpy
29 except ImportError:
29 except ImportError:
30 numpy = None
30 numpy = None
31
31
32 import zmq
32 import zmq
33 from zmq.eventloop import ioloop, zmqstream
33 from zmq.eventloop import ioloop, zmqstream
34
34
35 # local imports
35 # local imports
36 from IPython.external.decorator import decorator
36 from IPython.external.decorator import decorator
37 from IPython.config.loader import Config
37 from IPython.config.loader import Config
38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Str, Enum
38 from IPython.utils.traitlets import Instance, Dict, List, Set, Int, Str, Enum
39
39
40 from IPython.parallel import error
40 from IPython.parallel import error
41 from IPython.parallel.factory import SessionFactory
41 from IPython.parallel.factory import SessionFactory
42 from IPython.parallel.util import connect_logger, local_logger
42 from IPython.parallel.util import connect_logger, local_logger
43
43
44 from .dependency import Dependency
44 from .dependency import Dependency
45
45
46 @decorator
46 @decorator
47 def logged(f,self,*args,**kwargs):
47 def logged(f,self,*args,**kwargs):
48 # print ("#--------------------")
48 # print ("#--------------------")
49 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
49 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
50 # print ("#--")
50 # print ("#--")
51 return f(self,*args, **kwargs)
51 return f(self,*args, **kwargs)
52
52
53 #----------------------------------------------------------------------
53 #----------------------------------------------------------------------
54 # Chooser functions
54 # Chooser functions
55 #----------------------------------------------------------------------
55 #----------------------------------------------------------------------
56
56
57 def plainrandom(loads):
57 def plainrandom(loads):
58 """Plain random pick."""
58 """Plain random pick."""
59 n = len(loads)
59 n = len(loads)
60 return randint(0,n-1)
60 return randint(0,n-1)
61
61
62 def lru(loads):
62 def lru(loads):
63 """Always pick the front of the line.
63 """Always pick the front of the line.
64
64
65 The content of `loads` is ignored.
65 The content of `loads` is ignored.
66
66
67 Assumes LRU ordering of loads, with oldest first.
67 Assumes LRU ordering of loads, with oldest first.
68 """
68 """
69 return 0
69 return 0
70
70
71 def twobin(loads):
71 def twobin(loads):
72 """Pick two at random, use the LRU of the two.
72 """Pick two at random, use the LRU of the two.
73
73
74 The content of loads is ignored.
74 The content of loads is ignored.
75
75
76 Assumes LRU ordering of loads, with oldest first.
76 Assumes LRU ordering of loads, with oldest first.
77 """
77 """
78 n = len(loads)
78 n = len(loads)
79 a = randint(0,n-1)
79 a = randint(0,n-1)
80 b = randint(0,n-1)
80 b = randint(0,n-1)
81 return min(a,b)
81 return min(a,b)
82
82
83 def weighted(loads):
83 def weighted(loads):
84 """Pick two at random using inverse load as weight.
84 """Pick two at random using inverse load as weight.
85
85
86 Return the less loaded of the two.
86 Return the less loaded of the two.
87 """
87 """
88 # weight 0 a million times more than 1:
88 # weight 0 a million times more than 1:
89 weights = 1./(1e-6+numpy.array(loads))
89 weights = 1./(1e-6+numpy.array(loads))
90 sums = weights.cumsum()
90 sums = weights.cumsum()
91 t = sums[-1]
91 t = sums[-1]
92 x = random()*t
92 x = random()*t
93 y = random()*t
93 y = random()*t
94 idx = 0
94 idx = 0
95 idy = 0
95 idy = 0
96 while sums[idx] < x:
96 while sums[idx] < x:
97 idx += 1
97 idx += 1
98 while sums[idy] < y:
98 while sums[idy] < y:
99 idy += 1
99 idy += 1
100 if weights[idy] > weights[idx]:
100 if weights[idy] > weights[idx]:
101 return idy
101 return idy
102 else:
102 else:
103 return idx
103 return idx
104
104
105 def leastload(loads):
105 def leastload(loads):
106 """Always choose the lowest load.
106 """Always choose the lowest load.
107
107
108 If the lowest load occurs more than once, the first
108 If the lowest load occurs more than once, the first
109 occurance will be used. If loads has LRU ordering, this means
109 occurance will be used. If loads has LRU ordering, this means
110 the LRU of those with the lowest load is chosen.
110 the LRU of those with the lowest load is chosen.
111 """
111 """
112 return loads.index(min(loads))
112 return loads.index(min(loads))
113
113
114 #---------------------------------------------------------------------
114 #---------------------------------------------------------------------
115 # Classes
115 # Classes
116 #---------------------------------------------------------------------
116 #---------------------------------------------------------------------
117 # store empty default dependency:
117 # store empty default dependency:
118 MET = Dependency([])
118 MET = Dependency([])
119
119
120 class TaskScheduler(SessionFactory):
120 class TaskScheduler(SessionFactory):
121 """Python TaskScheduler object.
121 """Python TaskScheduler object.
122
122
123 This is the simplest object that supports msg_id based
123 This is the simplest object that supports msg_id based
124 DAG dependencies. *Only* task msg_ids are checked, not
124 DAG dependencies. *Only* task msg_ids are checked, not
125 msg_ids of jobs submitted via the MUX queue.
125 msg_ids of jobs submitted via the MUX queue.
126
126
127 """
127 """
128
128
129 hwm = Int(0, config=True, shortname='hwm',
129 hwm = Int(0, config=True, shortname='hwm',
130 help="""specify the High Water Mark (HWM) for the downstream
130 help="""specify the High Water Mark (HWM) for the downstream
131 socket in the Task scheduler. This is the maximum number
131 socket in the Task scheduler. This is the maximum number
132 of allowed outstanding tasks on each engine."""
132 of allowed outstanding tasks on each engine."""
133 )
133 )
134 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
134 scheme_name = Enum(('leastload', 'pure', 'lru', 'plainrandom', 'weighted', 'twobin'),
135 'leastload', config=True, shortname='scheme', allow_none=False,
135 'leastload', config=True, shortname='scheme', allow_none=False,
136 help="""select the task scheduler scheme [default: Python LRU]
136 help="""select the task scheduler scheme [default: Python LRU]
137 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
137 Options are: 'pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'"""
138 )
138 )
139 def _scheme_name_changed(self, old, new):
139 def _scheme_name_changed(self, old, new):
140 self.log.debug("Using scheme %r"%new)
140 self.log.debug("Using scheme %r"%new)
141 self.scheme = globals()[new]
141 self.scheme = globals()[new]
142
142
143 # input arguments:
143 # input arguments:
144 scheme = Instance(FunctionType) # function for determining the destination
144 scheme = Instance(FunctionType) # function for determining the destination
145 def _scheme_default(self):
145 def _scheme_default(self):
146 return leastload
146 return leastload
147 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
147 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
148 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
148 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
149 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
149 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
150 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
150 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
151
151
152 # internals:
152 # internals:
153 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
153 graph = Dict() # dict by msg_id of [ msg_ids that depend on key ]
154 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
154 retries = Dict() # dict by msg_id of retries remaining (non-neg ints)
155 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
155 # waiting = List() # list of msg_ids ready to run, but haven't due to HWM
156 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
156 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
157 pending = Dict() # dict by engine_uuid of submitted tasks
157 pending = Dict() # dict by engine_uuid of submitted tasks
158 completed = Dict() # dict by engine_uuid of completed tasks
158 completed = Dict() # dict by engine_uuid of completed tasks
159 failed = Dict() # dict by engine_uuid of failed tasks
159 failed = Dict() # dict by engine_uuid of failed tasks
160 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
160 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
161 clients = Dict() # dict by msg_id for who submitted the task
161 clients = Dict() # dict by msg_id for who submitted the task
162 targets = List() # list of target IDENTs
162 targets = List() # list of target IDENTs
163 loads = List() # list of engine loads
163 loads = List() # list of engine loads
164 # full = Set() # set of IDENTs that have HWM outstanding tasks
164 # full = Set() # set of IDENTs that have HWM outstanding tasks
165 all_completed = Set() # set of all completed tasks
165 all_completed = Set() # set of all completed tasks
166 all_failed = Set() # set of all failed tasks
166 all_failed = Set() # set of all failed tasks
167 all_done = Set() # set of all finished tasks=union(completed,failed)
167 all_done = Set() # set of all finished tasks=union(completed,failed)
168 all_ids = Set() # set of all submitted task IDs
168 all_ids = Set() # set of all submitted task IDs
169 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
169 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
170 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
170 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
171
171
172
172
173 def start(self):
173 def start(self):
174 self.engine_stream.on_recv(self.dispatch_result, copy=False)
174 self.engine_stream.on_recv(self.dispatch_result, copy=False)
175 self._notification_handlers = dict(
175 self._notification_handlers = dict(
176 registration_notification = self._register_engine,
176 registration_notification = self._register_engine,
177 unregistration_notification = self._unregister_engine
177 unregistration_notification = self._unregister_engine
178 )
178 )
179 self.notifier_stream.on_recv(self.dispatch_notification)
179 self.notifier_stream.on_recv(self.dispatch_notification)
180 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
180 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
181 self.auditor.start()
181 self.auditor.start()
182 self.log.info("Scheduler started...%r"%self)
182 self.log.info("Scheduler started [%s]"%self.scheme_name)
183
183
184 def resume_receiving(self):
184 def resume_receiving(self):
185 """Resume accepting jobs."""
185 """Resume accepting jobs."""
186 self.client_stream.on_recv(self.dispatch_submission, copy=False)
186 self.client_stream.on_recv(self.dispatch_submission, copy=False)
187
187
188 def stop_receiving(self):
188 def stop_receiving(self):
189 """Stop accepting jobs while there are no engines.
189 """Stop accepting jobs while there are no engines.
190 Leave them in the ZMQ queue."""
190 Leave them in the ZMQ queue."""
191 self.client_stream.on_recv(None)
191 self.client_stream.on_recv(None)
192
192
193 #-----------------------------------------------------------------------
193 #-----------------------------------------------------------------------
194 # [Un]Registration Handling
194 # [Un]Registration Handling
195 #-----------------------------------------------------------------------
195 #-----------------------------------------------------------------------
196
196
197 def dispatch_notification(self, msg):
197 def dispatch_notification(self, msg):
198 """dispatch register/unregister events."""
198 """dispatch register/unregister events."""
199 try:
199 try:
200 idents,msg = self.session.feed_identities(msg)
200 idents,msg = self.session.feed_identities(msg)
201 except ValueError:
201 except ValueError:
202 self.log.warn("task::Invalid Message: %r"%msg)
202 self.log.warn("task::Invalid Message: %r"%msg)
203 return
203 return
204 try:
204 try:
205 msg = self.session.unpack_message(msg)
205 msg = self.session.unpack_message(msg)
206 except ValueError:
206 except ValueError:
207 self.log.warn("task::Unauthorized message from: %r"%idents)
207 self.log.warn("task::Unauthorized message from: %r"%idents)
208 return
208 return
209
209
210 msg_type = msg['msg_type']
210 msg_type = msg['msg_type']
211
211
212 handler = self._notification_handlers.get(msg_type, None)
212 handler = self._notification_handlers.get(msg_type, None)
213 if handler is None:
213 if handler is None:
214 self.log.error("Unhandled message type: %r"%msg_type)
214 self.log.error("Unhandled message type: %r"%msg_type)
215 else:
215 else:
216 try:
216 try:
217 handler(str(msg['content']['queue']))
217 handler(str(msg['content']['queue']))
218 except KeyError:
218 except KeyError:
219 self.log.error("task::Invalid notification msg: %r"%msg)
219 self.log.error("task::Invalid notification msg: %r"%msg)
220
220
221 @logged
221 @logged
222 def _register_engine(self, uid):
222 def _register_engine(self, uid):
223 """New engine with ident `uid` became available."""
223 """New engine with ident `uid` became available."""
224 # head of the line:
224 # head of the line:
225 self.targets.insert(0,uid)
225 self.targets.insert(0,uid)
226 self.loads.insert(0,0)
226 self.loads.insert(0,0)
227 # initialize sets
227 # initialize sets
228 self.completed[uid] = set()
228 self.completed[uid] = set()
229 self.failed[uid] = set()
229 self.failed[uid] = set()
230 self.pending[uid] = {}
230 self.pending[uid] = {}
231 if len(self.targets) == 1:
231 if len(self.targets) == 1:
232 self.resume_receiving()
232 self.resume_receiving()
233 # rescan the graph:
233 # rescan the graph:
234 self.update_graph(None)
234 self.update_graph(None)
235
235
236 def _unregister_engine(self, uid):
236 def _unregister_engine(self, uid):
237 """Existing engine with ident `uid` became unavailable."""
237 """Existing engine with ident `uid` became unavailable."""
238 if len(self.targets) == 1:
238 if len(self.targets) == 1:
239 # this was our only engine
239 # this was our only engine
240 self.stop_receiving()
240 self.stop_receiving()
241
241
242 # handle any potentially finished tasks:
242 # handle any potentially finished tasks:
243 self.engine_stream.flush()
243 self.engine_stream.flush()
244
244
245 # don't pop destinations, because they might be used later
245 # don't pop destinations, because they might be used later
246 # map(self.destinations.pop, self.completed.pop(uid))
246 # map(self.destinations.pop, self.completed.pop(uid))
247 # map(self.destinations.pop, self.failed.pop(uid))
247 # map(self.destinations.pop, self.failed.pop(uid))
248
248
249 # prevent this engine from receiving work
249 # prevent this engine from receiving work
250 idx = self.targets.index(uid)
250 idx = self.targets.index(uid)
251 self.targets.pop(idx)
251 self.targets.pop(idx)
252 self.loads.pop(idx)
252 self.loads.pop(idx)
253
253
254 # wait 5 seconds before cleaning up pending jobs, since the results might
254 # wait 5 seconds before cleaning up pending jobs, since the results might
255 # still be incoming
255 # still be incoming
256 if self.pending[uid]:
256 if self.pending[uid]:
257 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
257 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
258 dc.start()
258 dc.start()
259 else:
259 else:
260 self.completed.pop(uid)
260 self.completed.pop(uid)
261 self.failed.pop(uid)
261 self.failed.pop(uid)
262
262
263
263
264 @logged
264 @logged
265 def handle_stranded_tasks(self, engine):
265 def handle_stranded_tasks(self, engine):
266 """Deal with jobs resident in an engine that died."""
266 """Deal with jobs resident in an engine that died."""
267 lost = self.pending[engine]
267 lost = self.pending[engine]
268 for msg_id in lost.keys():
268 for msg_id in lost.keys():
269 if msg_id not in self.pending[engine]:
269 if msg_id not in self.pending[engine]:
270 # prevent double-handling of messages
270 # prevent double-handling of messages
271 continue
271 continue
272
272
273 raw_msg = lost[msg_id][0]
273 raw_msg = lost[msg_id][0]
274 idents,msg = self.session.feed_identities(raw_msg, copy=False)
274 idents,msg = self.session.feed_identities(raw_msg, copy=False)
275 parent = self.session.unpack(msg[1].bytes)
275 parent = self.session.unpack(msg[1].bytes)
276 idents = [engine, idents[0]]
276 idents = [engine, idents[0]]
277
277
278 # build fake error reply
278 # build fake error reply
279 try:
279 try:
280 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
280 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
281 except:
281 except:
282 content = error.wrap_exception()
282 content = error.wrap_exception()
283 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
283 msg = self.session.msg('apply_reply', content, parent=parent, subheader={'status':'error'})
284 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
284 raw_reply = map(zmq.Message, self.session.serialize(msg, ident=idents))
285 # and dispatch it
285 # and dispatch it
286 self.dispatch_result(raw_reply)
286 self.dispatch_result(raw_reply)
287
287
288 # finally scrub completed/failed lists
288 # finally scrub completed/failed lists
289 self.completed.pop(engine)
289 self.completed.pop(engine)
290 self.failed.pop(engine)
290 self.failed.pop(engine)
291
291
292
292
293 #-----------------------------------------------------------------------
293 #-----------------------------------------------------------------------
294 # Job Submission
294 # Job Submission
295 #-----------------------------------------------------------------------
295 #-----------------------------------------------------------------------
296 @logged
296 @logged
297 def dispatch_submission(self, raw_msg):
297 def dispatch_submission(self, raw_msg):
298 """Dispatch job submission to appropriate handlers."""
298 """Dispatch job submission to appropriate handlers."""
299 # ensure targets up to date:
299 # ensure targets up to date:
300 self.notifier_stream.flush()
300 self.notifier_stream.flush()
301 try:
301 try:
302 idents, msg = self.session.feed_identities(raw_msg, copy=False)
302 idents, msg = self.session.feed_identities(raw_msg, copy=False)
303 msg = self.session.unpack_message(msg, content=False, copy=False)
303 msg = self.session.unpack_message(msg, content=False, copy=False)
304 except Exception:
304 except Exception:
305 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
305 self.log.error("task::Invaid task msg: %r"%raw_msg, exc_info=True)
306 return
306 return
307
307
308
308
309 # send to monitor
309 # send to monitor
310 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
310 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
311
311
312 header = msg['header']
312 header = msg['header']
313 msg_id = header['msg_id']
313 msg_id = header['msg_id']
314 self.all_ids.add(msg_id)
314 self.all_ids.add(msg_id)
315
315
316 # targets
316 # targets
317 targets = set(header.get('targets', []))
317 targets = set(header.get('targets', []))
318 retries = header.get('retries', 0)
318 retries = header.get('retries', 0)
319 self.retries[msg_id] = retries
319 self.retries[msg_id] = retries
320
320
321 # time dependencies
321 # time dependencies
322 after = Dependency(header.get('after', []))
322 after = Dependency(header.get('after', []))
323 if after.all:
323 if after.all:
324 if after.success:
324 if after.success:
325 after.difference_update(self.all_completed)
325 after.difference_update(self.all_completed)
326 if after.failure:
326 if after.failure:
327 after.difference_update(self.all_failed)
327 after.difference_update(self.all_failed)
328 if after.check(self.all_completed, self.all_failed):
328 if after.check(self.all_completed, self.all_failed):
329 # recast as empty set, if `after` already met,
329 # recast as empty set, if `after` already met,
330 # to prevent unnecessary set comparisons
330 # to prevent unnecessary set comparisons
331 after = MET
331 after = MET
332
332
333 # location dependencies
333 # location dependencies
334 follow = Dependency(header.get('follow', []))
334 follow = Dependency(header.get('follow', []))
335
335
336 # turn timeouts into datetime objects:
336 # turn timeouts into datetime objects:
337 timeout = header.get('timeout', None)
337 timeout = header.get('timeout', None)
338 if timeout:
338 if timeout:
339 timeout = datetime.now() + timedelta(0,timeout,0)
339 timeout = datetime.now() + timedelta(0,timeout,0)
340
340
341 args = [raw_msg, targets, after, follow, timeout]
341 args = [raw_msg, targets, after, follow, timeout]
342
342
343 # validate and reduce dependencies:
343 # validate and reduce dependencies:
344 for dep in after,follow:
344 for dep in after,follow:
345 # check valid:
345 # check valid:
346 if msg_id in dep or dep.difference(self.all_ids):
346 if msg_id in dep or dep.difference(self.all_ids):
347 self.depending[msg_id] = args
347 self.depending[msg_id] = args
348 return self.fail_unreachable(msg_id, error.InvalidDependency)
348 return self.fail_unreachable(msg_id, error.InvalidDependency)
349 # check if unreachable:
349 # check if unreachable:
350 if dep.unreachable(self.all_completed, self.all_failed):
350 if dep.unreachable(self.all_completed, self.all_failed):
351 self.depending[msg_id] = args
351 self.depending[msg_id] = args
352 return self.fail_unreachable(msg_id)
352 return self.fail_unreachable(msg_id)
353
353
354 if after.check(self.all_completed, self.all_failed):
354 if after.check(self.all_completed, self.all_failed):
355 # time deps already met, try to run
355 # time deps already met, try to run
356 if not self.maybe_run(msg_id, *args):
356 if not self.maybe_run(msg_id, *args):
357 # can't run yet
357 # can't run yet
358 if msg_id not in self.all_failed:
358 if msg_id not in self.all_failed:
359 # could have failed as unreachable
359 # could have failed as unreachable
360 self.save_unmet(msg_id, *args)
360 self.save_unmet(msg_id, *args)
361 else:
361 else:
362 self.save_unmet(msg_id, *args)
362 self.save_unmet(msg_id, *args)
363
363
364 # @logged
364 # @logged
365 def audit_timeouts(self):
365 def audit_timeouts(self):
366 """Audit all waiting tasks for expired timeouts."""
366 """Audit all waiting tasks for expired timeouts."""
367 now = datetime.now()
367 now = datetime.now()
368 for msg_id in self.depending.keys():
368 for msg_id in self.depending.keys():
369 # must recheck, in case one failure cascaded to another:
369 # must recheck, in case one failure cascaded to another:
370 if msg_id in self.depending:
370 if msg_id in self.depending:
371 raw,after,targets,follow,timeout = self.depending[msg_id]
371 raw,after,targets,follow,timeout = self.depending[msg_id]
372 if timeout and timeout < now:
372 if timeout and timeout < now:
373 self.fail_unreachable(msg_id, error.TaskTimeout)
373 self.fail_unreachable(msg_id, error.TaskTimeout)
374
374
375 @logged
375 @logged
376 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
376 def fail_unreachable(self, msg_id, why=error.ImpossibleDependency):
377 """a task has become unreachable, send a reply with an ImpossibleDependency
377 """a task has become unreachable, send a reply with an ImpossibleDependency
378 error."""
378 error."""
379 if msg_id not in self.depending:
379 if msg_id not in self.depending:
380 self.log.error("msg %r already failed!"%msg_id)
380 self.log.error("msg %r already failed!"%msg_id)
381 return
381 return
382 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
382 raw_msg,targets,after,follow,timeout = self.depending.pop(msg_id)
383 for mid in follow.union(after):
383 for mid in follow.union(after):
384 if mid in self.graph:
384 if mid in self.graph:
385 self.graph[mid].remove(msg_id)
385 self.graph[mid].remove(msg_id)
386
386
387 # FIXME: unpacking a message I've already unpacked, but didn't save:
387 # FIXME: unpacking a message I've already unpacked, but didn't save:
388 idents,msg = self.session.feed_identities(raw_msg, copy=False)
388 idents,msg = self.session.feed_identities(raw_msg, copy=False)
389 header = self.session.unpack(msg[1].bytes)
389 header = self.session.unpack(msg[1].bytes)
390
390
391 try:
391 try:
392 raise why()
392 raise why()
393 except:
393 except:
394 content = error.wrap_exception()
394 content = error.wrap_exception()
395
395
396 self.all_done.add(msg_id)
396 self.all_done.add(msg_id)
397 self.all_failed.add(msg_id)
397 self.all_failed.add(msg_id)
398
398
399 msg = self.session.send(self.client_stream, 'apply_reply', content,
399 msg = self.session.send(self.client_stream, 'apply_reply', content,
400 parent=header, ident=idents)
400 parent=header, ident=idents)
401 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
401 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
402
402
403 self.update_graph(msg_id, success=False)
403 self.update_graph(msg_id, success=False)
404
404
405 @logged
405 @logged
406 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
406 def maybe_run(self, msg_id, raw_msg, targets, after, follow, timeout):
407 """check location dependencies, and run if they are met."""
407 """check location dependencies, and run if they are met."""
408 blacklist = self.blacklist.setdefault(msg_id, set())
408 blacklist = self.blacklist.setdefault(msg_id, set())
409 if follow or targets or blacklist or self.hwm:
409 if follow or targets or blacklist or self.hwm:
410 # we need a can_run filter
410 # we need a can_run filter
411 def can_run(idx):
411 def can_run(idx):
412 # check hwm
412 # check hwm
413 if self.hwm and self.loads[idx] == self.hwm:
413 if self.hwm and self.loads[idx] == self.hwm:
414 return False
414 return False
415 target = self.targets[idx]
415 target = self.targets[idx]
416 # check blacklist
416 # check blacklist
417 if target in blacklist:
417 if target in blacklist:
418 return False
418 return False
419 # check targets
419 # check targets
420 if targets and target not in targets:
420 if targets and target not in targets:
421 return False
421 return False
422 # check follow
422 # check follow
423 return follow.check(self.completed[target], self.failed[target])
423 return follow.check(self.completed[target], self.failed[target])
424
424
425 indices = filter(can_run, range(len(self.targets)))
425 indices = filter(can_run, range(len(self.targets)))
426
426
427 if not indices:
427 if not indices:
428 # couldn't run
428 # couldn't run
429 if follow.all:
429 if follow.all:
430 # check follow for impossibility
430 # check follow for impossibility
431 dests = set()
431 dests = set()
432 relevant = set()
432 relevant = set()
433 if follow.success:
433 if follow.success:
434 relevant = self.all_completed
434 relevant = self.all_completed
435 if follow.failure:
435 if follow.failure:
436 relevant = relevant.union(self.all_failed)
436 relevant = relevant.union(self.all_failed)
437 for m in follow.intersection(relevant):
437 for m in follow.intersection(relevant):
438 dests.add(self.destinations[m])
438 dests.add(self.destinations[m])
439 if len(dests) > 1:
439 if len(dests) > 1:
440 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
440 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
441 self.fail_unreachable(msg_id)
441 self.fail_unreachable(msg_id)
442 return False
442 return False
443 if targets:
443 if targets:
444 # check blacklist+targets for impossibility
444 # check blacklist+targets for impossibility
445 targets.difference_update(blacklist)
445 targets.difference_update(blacklist)
446 if not targets or not targets.intersection(self.targets):
446 if not targets or not targets.intersection(self.targets):
447 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
447 self.depending[msg_id] = (raw_msg, targets, after, follow, timeout)
448 self.fail_unreachable(msg_id)
448 self.fail_unreachable(msg_id)
449 return False
449 return False
450 return False
450 return False
451 else:
451 else:
452 indices = None
452 indices = None
453
453
454 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
454 self.submit_task(msg_id, raw_msg, targets, follow, timeout, indices)
455 return True
455 return True
456
456
457 @logged
457 @logged
458 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
458 def save_unmet(self, msg_id, raw_msg, targets, after, follow, timeout):
459 """Save a message for later submission when its dependencies are met."""
459 """Save a message for later submission when its dependencies are met."""
460 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
460 self.depending[msg_id] = [raw_msg,targets,after,follow,timeout]
461 # track the ids in follow or after, but not those already finished
461 # track the ids in follow or after, but not those already finished
462 for dep_id in after.union(follow).difference(self.all_done):
462 for dep_id in after.union(follow).difference(self.all_done):
463 if dep_id not in self.graph:
463 if dep_id not in self.graph:
464 self.graph[dep_id] = set()
464 self.graph[dep_id] = set()
465 self.graph[dep_id].add(msg_id)
465 self.graph[dep_id].add(msg_id)
466
466
467 @logged
467 @logged
468 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
468 def submit_task(self, msg_id, raw_msg, targets, follow, timeout, indices=None):
469 """Submit a task to any of a subset of our targets."""
469 """Submit a task to any of a subset of our targets."""
470 if indices:
470 if indices:
471 loads = [self.loads[i] for i in indices]
471 loads = [self.loads[i] for i in indices]
472 else:
472 else:
473 loads = self.loads
473 loads = self.loads
474 idx = self.scheme(loads)
474 idx = self.scheme(loads)
475 if indices:
475 if indices:
476 idx = indices[idx]
476 idx = indices[idx]
477 target = self.targets[idx]
477 target = self.targets[idx]
478 # print (target, map(str, msg[:3]))
478 # print (target, map(str, msg[:3]))
479 # send job to the engine
479 # send job to the engine
480 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
480 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
481 self.engine_stream.send_multipart(raw_msg, copy=False)
481 self.engine_stream.send_multipart(raw_msg, copy=False)
482 # update load
482 # update load
483 self.add_job(idx)
483 self.add_job(idx)
484 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
484 self.pending[target][msg_id] = (raw_msg, targets, MET, follow, timeout)
485 # notify Hub
485 # notify Hub
486 content = dict(msg_id=msg_id, engine_id=target)
486 content = dict(msg_id=msg_id, engine_id=target)
487 self.session.send(self.mon_stream, 'task_destination', content=content,
487 self.session.send(self.mon_stream, 'task_destination', content=content,
488 ident=['tracktask',self.session.session])
488 ident=['tracktask',self.session.session])
489
489
490
490
491 #-----------------------------------------------------------------------
491 #-----------------------------------------------------------------------
492 # Result Handling
492 # Result Handling
493 #-----------------------------------------------------------------------
493 #-----------------------------------------------------------------------
494 @logged
494 @logged
495 def dispatch_result(self, raw_msg):
495 def dispatch_result(self, raw_msg):
496 """dispatch method for result replies"""
496 """dispatch method for result replies"""
497 try:
497 try:
498 idents,msg = self.session.feed_identities(raw_msg, copy=False)
498 idents,msg = self.session.feed_identities(raw_msg, copy=False)
499 msg = self.session.unpack_message(msg, content=False, copy=False)
499 msg = self.session.unpack_message(msg, content=False, copy=False)
500 engine = idents[0]
500 engine = idents[0]
501 try:
501 try:
502 idx = self.targets.index(engine)
502 idx = self.targets.index(engine)
503 except ValueError:
503 except ValueError:
504 pass # skip load-update for dead engines
504 pass # skip load-update for dead engines
505 else:
505 else:
506 self.finish_job(idx)
506 self.finish_job(idx)
507 except Exception:
507 except Exception:
508 self.log.error("task::Invaid result: %r"%raw_msg, exc_info=True)
508 self.log.error("task::Invaid result: %r"%raw_msg, exc_info=True)
509 return
509 return
510
510
511 header = msg['header']
511 header = msg['header']
512 parent = msg['parent_header']
512 parent = msg['parent_header']
513 if header.get('dependencies_met', True):
513 if header.get('dependencies_met', True):
514 success = (header['status'] == 'ok')
514 success = (header['status'] == 'ok')
515 msg_id = parent['msg_id']
515 msg_id = parent['msg_id']
516 retries = self.retries[msg_id]
516 retries = self.retries[msg_id]
517 if not success and retries > 0:
517 if not success and retries > 0:
518 # failed
518 # failed
519 self.retries[msg_id] = retries - 1
519 self.retries[msg_id] = retries - 1
520 self.handle_unmet_dependency(idents, parent)
520 self.handle_unmet_dependency(idents, parent)
521 else:
521 else:
522 del self.retries[msg_id]
522 del self.retries[msg_id]
523 # relay to client and update graph
523 # relay to client and update graph
524 self.handle_result(idents, parent, raw_msg, success)
524 self.handle_result(idents, parent, raw_msg, success)
525 # send to Hub monitor
525 # send to Hub monitor
526 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
526 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
527 else:
527 else:
528 self.handle_unmet_dependency(idents, parent)
528 self.handle_unmet_dependency(idents, parent)
529
529
530 @logged
530 @logged
531 def handle_result(self, idents, parent, raw_msg, success=True):
531 def handle_result(self, idents, parent, raw_msg, success=True):
532 """handle a real task result, either success or failure"""
532 """handle a real task result, either success or failure"""
533 # first, relay result to client
533 # first, relay result to client
534 engine = idents[0]
534 engine = idents[0]
535 client = idents[1]
535 client = idents[1]
536 # swap_ids for XREP-XREP mirror
536 # swap_ids for XREP-XREP mirror
537 raw_msg[:2] = [client,engine]
537 raw_msg[:2] = [client,engine]
538 # print (map(str, raw_msg[:4]))
538 # print (map(str, raw_msg[:4]))
539 self.client_stream.send_multipart(raw_msg, copy=False)
539 self.client_stream.send_multipart(raw_msg, copy=False)
540 # now, update our data structures
540 # now, update our data structures
541 msg_id = parent['msg_id']
541 msg_id = parent['msg_id']
542 self.blacklist.pop(msg_id, None)
542 self.blacklist.pop(msg_id, None)
543 self.pending[engine].pop(msg_id)
543 self.pending[engine].pop(msg_id)
544 if success:
544 if success:
545 self.completed[engine].add(msg_id)
545 self.completed[engine].add(msg_id)
546 self.all_completed.add(msg_id)
546 self.all_completed.add(msg_id)
547 else:
547 else:
548 self.failed[engine].add(msg_id)
548 self.failed[engine].add(msg_id)
549 self.all_failed.add(msg_id)
549 self.all_failed.add(msg_id)
550 self.all_done.add(msg_id)
550 self.all_done.add(msg_id)
551 self.destinations[msg_id] = engine
551 self.destinations[msg_id] = engine
552
552
553 self.update_graph(msg_id, success)
553 self.update_graph(msg_id, success)
554
554
555 @logged
555 @logged
556 def handle_unmet_dependency(self, idents, parent):
556 def handle_unmet_dependency(self, idents, parent):
557 """handle an unmet dependency"""
557 """handle an unmet dependency"""
558 engine = idents[0]
558 engine = idents[0]
559 msg_id = parent['msg_id']
559 msg_id = parent['msg_id']
560
560
561 if msg_id not in self.blacklist:
561 if msg_id not in self.blacklist:
562 self.blacklist[msg_id] = set()
562 self.blacklist[msg_id] = set()
563 self.blacklist[msg_id].add(engine)
563 self.blacklist[msg_id].add(engine)
564
564
565 args = self.pending[engine].pop(msg_id)
565 args = self.pending[engine].pop(msg_id)
566 raw,targets,after,follow,timeout = args
566 raw,targets,after,follow,timeout = args
567
567
568 if self.blacklist[msg_id] == targets:
568 if self.blacklist[msg_id] == targets:
569 self.depending[msg_id] = args
569 self.depending[msg_id] = args
570 self.fail_unreachable(msg_id)
570 self.fail_unreachable(msg_id)
571 elif not self.maybe_run(msg_id, *args):
571 elif not self.maybe_run(msg_id, *args):
572 # resubmit failed
572 # resubmit failed
573 if msg_id not in self.all_failed:
573 if msg_id not in self.all_failed:
574 # put it back in our dependency tree
574 # put it back in our dependency tree
575 self.save_unmet(msg_id, *args)
575 self.save_unmet(msg_id, *args)
576
576
577 if self.hwm:
577 if self.hwm:
578 try:
578 try:
579 idx = self.targets.index(engine)
579 idx = self.targets.index(engine)
580 except ValueError:
580 except ValueError:
581 pass # skip load-update for dead engines
581 pass # skip load-update for dead engines
582 else:
582 else:
583 if self.loads[idx] == self.hwm-1:
583 if self.loads[idx] == self.hwm-1:
584 self.update_graph(None)
584 self.update_graph(None)
585
585
586
586
587
587
588 @logged
588 @logged
589 def update_graph(self, dep_id=None, success=True):
589 def update_graph(self, dep_id=None, success=True):
590 """dep_id just finished. Update our dependency
590 """dep_id just finished. Update our dependency
591 graph and submit any jobs that just became runable.
591 graph and submit any jobs that just became runable.
592
592
593 Called with dep_id=None to update entire graph for hwm, but without finishing
593 Called with dep_id=None to update entire graph for hwm, but without finishing
594 a task.
594 a task.
595 """
595 """
596 # print ("\n\n***********")
596 # print ("\n\n***********")
597 # pprint (dep_id)
597 # pprint (dep_id)
598 # pprint (self.graph)
598 # pprint (self.graph)
599 # pprint (self.depending)
599 # pprint (self.depending)
600 # pprint (self.all_completed)
600 # pprint (self.all_completed)
601 # pprint (self.all_failed)
601 # pprint (self.all_failed)
602 # print ("\n\n***********\n\n")
602 # print ("\n\n***********\n\n")
603 # update any jobs that depended on the dependency
603 # update any jobs that depended on the dependency
604 jobs = self.graph.pop(dep_id, [])
604 jobs = self.graph.pop(dep_id, [])
605
605
606 # recheck *all* jobs if
606 # recheck *all* jobs if
607 # a) we have HWM and an engine just become no longer full
607 # a) we have HWM and an engine just become no longer full
608 # or b) dep_id was given as None
608 # or b) dep_id was given as None
609 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
609 if dep_id is None or self.hwm and any( [ load==self.hwm-1 for load in self.loads ]):
610 jobs = self.depending.keys()
610 jobs = self.depending.keys()
611
611
612 for msg_id in jobs:
612 for msg_id in jobs:
613 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
613 raw_msg, targets, after, follow, timeout = self.depending[msg_id]
614
614
615 if after.unreachable(self.all_completed, self.all_failed)\
615 if after.unreachable(self.all_completed, self.all_failed)\
616 or follow.unreachable(self.all_completed, self.all_failed):
616 or follow.unreachable(self.all_completed, self.all_failed):
617 self.fail_unreachable(msg_id)
617 self.fail_unreachable(msg_id)
618
618
619 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
619 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
620 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
620 if self.maybe_run(msg_id, raw_msg, targets, MET, follow, timeout):
621
621
622 self.depending.pop(msg_id)
622 self.depending.pop(msg_id)
623 for mid in follow.union(after):
623 for mid in follow.union(after):
624 if mid in self.graph:
624 if mid in self.graph:
625 self.graph[mid].remove(msg_id)
625 self.graph[mid].remove(msg_id)
626
626
627 #----------------------------------------------------------------------
627 #----------------------------------------------------------------------
628 # methods to be overridden by subclasses
628 # methods to be overridden by subclasses
629 #----------------------------------------------------------------------
629 #----------------------------------------------------------------------
630
630
631 def add_job(self, idx):
631 def add_job(self, idx):
632 """Called after self.targets[idx] just got the job with header.
632 """Called after self.targets[idx] just got the job with header.
633 Override with subclasses. The default ordering is simple LRU.
633 Override with subclasses. The default ordering is simple LRU.
634 The default loads are the number of outstanding jobs."""
634 The default loads are the number of outstanding jobs."""
635 self.loads[idx] += 1
635 self.loads[idx] += 1
636 for lis in (self.targets, self.loads):
636 for lis in (self.targets, self.loads):
637 lis.append(lis.pop(idx))
637 lis.append(lis.pop(idx))
638
638
639
639
640 def finish_job(self, idx):
640 def finish_job(self, idx):
641 """Called after self.targets[idx] just finished a job.
641 """Called after self.targets[idx] just finished a job.
642 Override with subclasses."""
642 Override with subclasses."""
643 self.loads[idx] -= 1
643 self.loads[idx] -= 1
644
644
645
645
646
646
647 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
647 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,
648 logname='root', log_url=None, loglevel=logging.DEBUG,
648 logname='root', log_url=None, loglevel=logging.DEBUG,
649 identity=b'task'):
649 identity=b'task'):
650 from zmq.eventloop import ioloop
650 from zmq.eventloop import ioloop
651 from zmq.eventloop.zmqstream import ZMQStream
651 from zmq.eventloop.zmqstream import ZMQStream
652
652
653 if config:
653 if config:
654 # unwrap dict back into Config
654 # unwrap dict back into Config
655 config = Config(config)
655 config = Config(config)
656
656
657 ctx = zmq.Context()
657 ctx = zmq.Context()
658 loop = ioloop.IOLoop()
658 loop = ioloop.IOLoop()
659 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
659 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
660 ins.setsockopt(zmq.IDENTITY, identity)
660 ins.setsockopt(zmq.IDENTITY, identity)
661 ins.bind(in_addr)
661 ins.bind(in_addr)
662
662
663 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
663 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
664 outs.setsockopt(zmq.IDENTITY, identity)
664 outs.setsockopt(zmq.IDENTITY, identity)
665 outs.bind(out_addr)
665 outs.bind(out_addr)
666 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
666 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
667 mons.connect(mon_addr)
667 mons.connect(mon_addr)
668 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
668 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
669 nots.setsockopt(zmq.SUBSCRIBE, '')
669 nots.setsockopt(zmq.SUBSCRIBE, '')
670 nots.connect(not_addr)
670 nots.connect(not_addr)
671
671
672 # setup logging. Note that these will not work in-process, because they clobber
672 # setup logging. Note that these will not work in-process, because they clobber
673 # existing loggers.
673 # existing loggers.
674 if log_url:
674 if log_url:
675 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
675 log = connect_logger(logname, ctx, log_url, root="scheduler", loglevel=loglevel)
676 else:
676 else:
677 log = local_logger(logname, loglevel)
677 log = local_logger(logname, loglevel)
678
678
679 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
679 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
680 mon_stream=mons, notifier_stream=nots,
680 mon_stream=mons, notifier_stream=nots,
681 loop=loop, log=log,
681 loop=loop, log=log,
682 config=config)
682 config=config)
683 scheduler.start()
683 scheduler.start()
684 try:
684 try:
685 loop.start()
685 loop.start()
686 except KeyboardInterrupt:
686 except KeyboardInterrupt:
687 print ("interrupted, exiting...", file=sys.__stderr__)
687 print ("interrupted, exiting...", file=sys.__stderr__)
688
688
@@ -1,434 +1,433 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """
2 """
3 Kernel adapted from kernel.py to use ZMQ Streams
3 Kernel adapted from kernel.py to use ZMQ Streams
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010-2011 The IPython Development Team
6 # Copyright (C) 2010-2011 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
9 # the file COPYING, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15
15
16 # Standard library imports.
16 # Standard library imports.
17 from __future__ import print_function
17 from __future__ import print_function
18
18
19 import sys
19 import sys
20 import time
20 import time
21
21
22 from code import CommandCompiler
22 from code import CommandCompiler
23 from datetime import datetime
23 from datetime import datetime
24 from pprint import pprint
24 from pprint import pprint
25
25
26 # System library imports.
26 # System library imports.
27 import zmq
27 import zmq
28 from zmq.eventloop import ioloop, zmqstream
28 from zmq.eventloop import ioloop, zmqstream
29
29
30 # Local imports.
30 # Local imports.
31 from IPython.utils.jsonutil import ISO8601
32 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode
31 from IPython.utils.traitlets import Instance, List, Int, Dict, Set, Unicode
33 from IPython.zmq.completer import KernelCompleter
32 from IPython.zmq.completer import KernelCompleter
34
33
35 from IPython.parallel.error import wrap_exception
34 from IPython.parallel.error import wrap_exception
36 from IPython.parallel.factory import SessionFactory
35 from IPython.parallel.factory import SessionFactory
37 from IPython.parallel.util import serialize_object, unpack_apply_message
36 from IPython.parallel.util import serialize_object, unpack_apply_message
38
37
39 def printer(*args):
38 def printer(*args):
40 pprint(args, stream=sys.__stdout__)
39 pprint(args, stream=sys.__stdout__)
41
40
42
41
43 class _Passer(zmqstream.ZMQStream):
42 class _Passer(zmqstream.ZMQStream):
44 """Empty class that implements `send()` that does nothing.
43 """Empty class that implements `send()` that does nothing.
45
44
46 Subclass ZMQStream for Session typechecking
45 Subclass ZMQStream for Session typechecking
47
46
48 """
47 """
49 def __init__(self, *args, **kwargs):
48 def __init__(self, *args, **kwargs):
50 pass
49 pass
51
50
52 def send(self, *args, **kwargs):
51 def send(self, *args, **kwargs):
53 pass
52 pass
54 send_multipart = send
53 send_multipart = send
55
54
56
55
57 #-----------------------------------------------------------------------------
56 #-----------------------------------------------------------------------------
58 # Main kernel class
57 # Main kernel class
59 #-----------------------------------------------------------------------------
58 #-----------------------------------------------------------------------------
60
59
61 class Kernel(SessionFactory):
60 class Kernel(SessionFactory):
62
61
63 #---------------------------------------------------------------------------
62 #---------------------------------------------------------------------------
64 # Kernel interface
63 # Kernel interface
65 #---------------------------------------------------------------------------
64 #---------------------------------------------------------------------------
66
65
67 # kwargs:
66 # kwargs:
68 exec_lines = List(Unicode, config=True,
67 exec_lines = List(Unicode, config=True,
69 help="List of lines to execute")
68 help="List of lines to execute")
70
69
71 int_id = Int(-1)
70 int_id = Int(-1)
72 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
71 user_ns = Dict(config=True, help="""Set the user's namespace of the Kernel""")
73
72
74 control_stream = Instance(zmqstream.ZMQStream)
73 control_stream = Instance(zmqstream.ZMQStream)
75 task_stream = Instance(zmqstream.ZMQStream)
74 task_stream = Instance(zmqstream.ZMQStream)
76 iopub_stream = Instance(zmqstream.ZMQStream)
75 iopub_stream = Instance(zmqstream.ZMQStream)
77 client = Instance('IPython.parallel.Client')
76 client = Instance('IPython.parallel.Client')
78
77
79 # internals
78 # internals
80 shell_streams = List()
79 shell_streams = List()
81 compiler = Instance(CommandCompiler, (), {})
80 compiler = Instance(CommandCompiler, (), {})
82 completer = Instance(KernelCompleter)
81 completer = Instance(KernelCompleter)
83
82
84 aborted = Set()
83 aborted = Set()
85 shell_handlers = Dict()
84 shell_handlers = Dict()
86 control_handlers = Dict()
85 control_handlers = Dict()
87
86
88 def _set_prefix(self):
87 def _set_prefix(self):
89 self.prefix = "engine.%s"%self.int_id
88 self.prefix = "engine.%s"%self.int_id
90
89
91 def _connect_completer(self):
90 def _connect_completer(self):
92 self.completer = KernelCompleter(self.user_ns)
91 self.completer = KernelCompleter(self.user_ns)
93
92
94 def __init__(self, **kwargs):
93 def __init__(self, **kwargs):
95 super(Kernel, self).__init__(**kwargs)
94 super(Kernel, self).__init__(**kwargs)
96 self._set_prefix()
95 self._set_prefix()
97 self._connect_completer()
96 self._connect_completer()
98
97
99 self.on_trait_change(self._set_prefix, 'id')
98 self.on_trait_change(self._set_prefix, 'id')
100 self.on_trait_change(self._connect_completer, 'user_ns')
99 self.on_trait_change(self._connect_completer, 'user_ns')
101
100
102 # Build dict of handlers for message types
101 # Build dict of handlers for message types
103 for msg_type in ['execute_request', 'complete_request', 'apply_request',
102 for msg_type in ['execute_request', 'complete_request', 'apply_request',
104 'clear_request']:
103 'clear_request']:
105 self.shell_handlers[msg_type] = getattr(self, msg_type)
104 self.shell_handlers[msg_type] = getattr(self, msg_type)
106
105
107 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
106 for msg_type in ['shutdown_request', 'abort_request']+self.shell_handlers.keys():
108 self.control_handlers[msg_type] = getattr(self, msg_type)
107 self.control_handlers[msg_type] = getattr(self, msg_type)
109
108
110 self._initial_exec_lines()
109 self._initial_exec_lines()
111
110
112 def _wrap_exception(self, method=None):
111 def _wrap_exception(self, method=None):
113 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
112 e_info = dict(engine_uuid=self.ident, engine_id=self.int_id, method=method)
114 content=wrap_exception(e_info)
113 content=wrap_exception(e_info)
115 return content
114 return content
116
115
117 def _initial_exec_lines(self):
116 def _initial_exec_lines(self):
118 s = _Passer()
117 s = _Passer()
119 content = dict(silent=True, user_variable=[],user_expressions=[])
118 content = dict(silent=True, user_variable=[],user_expressions=[])
120 for line in self.exec_lines:
119 for line in self.exec_lines:
121 self.log.debug("executing initialization: %s"%line)
120 self.log.debug("executing initialization: %s"%line)
122 content.update({'code':line})
121 content.update({'code':line})
123 msg = self.session.msg('execute_request', content)
122 msg = self.session.msg('execute_request', content)
124 self.execute_request(s, [], msg)
123 self.execute_request(s, [], msg)
125
124
126
125
127 #-------------------- control handlers -----------------------------
126 #-------------------- control handlers -----------------------------
128 def abort_queues(self):
127 def abort_queues(self):
129 for stream in self.shell_streams:
128 for stream in self.shell_streams:
130 if stream:
129 if stream:
131 self.abort_queue(stream)
130 self.abort_queue(stream)
132
131
133 def abort_queue(self, stream):
132 def abort_queue(self, stream):
134 while True:
133 while True:
135 try:
134 try:
136 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
135 msg = self.session.recv(stream, zmq.NOBLOCK,content=True)
137 except zmq.ZMQError as e:
136 except zmq.ZMQError as e:
138 if e.errno == zmq.EAGAIN:
137 if e.errno == zmq.EAGAIN:
139 break
138 break
140 else:
139 else:
141 return
140 return
142 else:
141 else:
143 if msg is None:
142 if msg is None:
144 return
143 return
145 else:
144 else:
146 idents,msg = msg
145 idents,msg = msg
147
146
148 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
147 # assert self.reply_socketly_socket.rcvmore(), "Unexpected missing message part."
149 # msg = self.reply_socket.recv_json()
148 # msg = self.reply_socket.recv_json()
150 self.log.info("Aborting:")
149 self.log.info("Aborting:")
151 self.log.info(str(msg))
150 self.log.info(str(msg))
152 msg_type = msg['msg_type']
151 msg_type = msg['msg_type']
153 reply_type = msg_type.split('_')[0] + '_reply'
152 reply_type = msg_type.split('_')[0] + '_reply'
154 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
153 # reply_msg = self.session.msg(reply_type, {'status' : 'aborted'}, msg)
155 # self.reply_socket.send(ident,zmq.SNDMORE)
154 # self.reply_socket.send(ident,zmq.SNDMORE)
156 # self.reply_socket.send_json(reply_msg)
155 # self.reply_socket.send_json(reply_msg)
157 reply_msg = self.session.send(stream, reply_type,
156 reply_msg = self.session.send(stream, reply_type,
158 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
157 content={'status' : 'aborted'}, parent=msg, ident=idents)[0]
159 self.log.debug(str(reply_msg))
158 self.log.debug(str(reply_msg))
160 # We need to wait a bit for requests to come in. This can probably
159 # We need to wait a bit for requests to come in. This can probably
161 # be set shorter for true asynchronous clients.
160 # be set shorter for true asynchronous clients.
162 time.sleep(0.05)
161 time.sleep(0.05)
163
162
164 def abort_request(self, stream, ident, parent):
163 def abort_request(self, stream, ident, parent):
165 """abort a specifig msg by id"""
164 """abort a specifig msg by id"""
166 msg_ids = parent['content'].get('msg_ids', None)
165 msg_ids = parent['content'].get('msg_ids', None)
167 if isinstance(msg_ids, basestring):
166 if isinstance(msg_ids, basestring):
168 msg_ids = [msg_ids]
167 msg_ids = [msg_ids]
169 if not msg_ids:
168 if not msg_ids:
170 self.abort_queues()
169 self.abort_queues()
171 for mid in msg_ids:
170 for mid in msg_ids:
172 self.aborted.add(str(mid))
171 self.aborted.add(str(mid))
173
172
174 content = dict(status='ok')
173 content = dict(status='ok')
175 reply_msg = self.session.send(stream, 'abort_reply', content=content,
174 reply_msg = self.session.send(stream, 'abort_reply', content=content,
176 parent=parent, ident=ident)
175 parent=parent, ident=ident)
177 self.log.debug(str(reply_msg))
176 self.log.debug(str(reply_msg))
178
177
179 def shutdown_request(self, stream, ident, parent):
178 def shutdown_request(self, stream, ident, parent):
180 """kill ourself. This should really be handled in an external process"""
179 """kill ourself. This should really be handled in an external process"""
181 try:
180 try:
182 self.abort_queues()
181 self.abort_queues()
183 except:
182 except:
184 content = self._wrap_exception('shutdown')
183 content = self._wrap_exception('shutdown')
185 else:
184 else:
186 content = dict(parent['content'])
185 content = dict(parent['content'])
187 content['status'] = 'ok'
186 content['status'] = 'ok'
188 msg = self.session.send(stream, 'shutdown_reply',
187 msg = self.session.send(stream, 'shutdown_reply',
189 content=content, parent=parent, ident=ident)
188 content=content, parent=parent, ident=ident)
190 self.log.debug(str(msg))
189 self.log.debug(str(msg))
191 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
190 dc = ioloop.DelayedCallback(lambda : sys.exit(0), 1000, self.loop)
192 dc.start()
191 dc.start()
193
192
194 def dispatch_control(self, msg):
193 def dispatch_control(self, msg):
195 idents,msg = self.session.feed_identities(msg, copy=False)
194 idents,msg = self.session.feed_identities(msg, copy=False)
196 try:
195 try:
197 msg = self.session.unpack_message(msg, content=True, copy=False)
196 msg = self.session.unpack_message(msg, content=True, copy=False)
198 except:
197 except:
199 self.log.error("Invalid Message", exc_info=True)
198 self.log.error("Invalid Message", exc_info=True)
200 return
199 return
201
200
202 header = msg['header']
201 header = msg['header']
203 msg_id = header['msg_id']
202 msg_id = header['msg_id']
204
203
205 handler = self.control_handlers.get(msg['msg_type'], None)
204 handler = self.control_handlers.get(msg['msg_type'], None)
206 if handler is None:
205 if handler is None:
207 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
206 self.log.error("UNKNOWN CONTROL MESSAGE TYPE: %r"%msg['msg_type'])
208 else:
207 else:
209 handler(self.control_stream, idents, msg)
208 handler(self.control_stream, idents, msg)
210
209
211
210
212 #-------------------- queue helpers ------------------------------
211 #-------------------- queue helpers ------------------------------
213
212
214 def check_dependencies(self, dependencies):
213 def check_dependencies(self, dependencies):
215 if not dependencies:
214 if not dependencies:
216 return True
215 return True
217 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
216 if len(dependencies) == 2 and dependencies[0] in 'any all'.split():
218 anyorall = dependencies[0]
217 anyorall = dependencies[0]
219 dependencies = dependencies[1]
218 dependencies = dependencies[1]
220 else:
219 else:
221 anyorall = 'all'
220 anyorall = 'all'
222 results = self.client.get_results(dependencies,status_only=True)
221 results = self.client.get_results(dependencies,status_only=True)
223 if results['status'] != 'ok':
222 if results['status'] != 'ok':
224 return False
223 return False
225
224
226 if anyorall == 'any':
225 if anyorall == 'any':
227 if not results['completed']:
226 if not results['completed']:
228 return False
227 return False
229 else:
228 else:
230 if results['pending']:
229 if results['pending']:
231 return False
230 return False
232
231
233 return True
232 return True
234
233
235 def check_aborted(self, msg_id):
234 def check_aborted(self, msg_id):
236 return msg_id in self.aborted
235 return msg_id in self.aborted
237
236
238 #-------------------- queue handlers -----------------------------
237 #-------------------- queue handlers -----------------------------
239
238
240 def clear_request(self, stream, idents, parent):
239 def clear_request(self, stream, idents, parent):
241 """Clear our namespace."""
240 """Clear our namespace."""
242 self.user_ns = {}
241 self.user_ns = {}
243 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
242 msg = self.session.send(stream, 'clear_reply', ident=idents, parent=parent,
244 content = dict(status='ok'))
243 content = dict(status='ok'))
245 self._initial_exec_lines()
244 self._initial_exec_lines()
246
245
247 def execute_request(self, stream, ident, parent):
246 def execute_request(self, stream, ident, parent):
248 self.log.debug('execute request %s'%parent)
247 self.log.debug('execute request %s'%parent)
249 try:
248 try:
250 code = parent[u'content'][u'code']
249 code = parent[u'content'][u'code']
251 except:
250 except:
252 self.log.error("Got bad msg: %s"%parent, exc_info=True)
251 self.log.error("Got bad msg: %s"%parent, exc_info=True)
253 return
252 return
254 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
253 self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent,
255 ident='%s.pyin'%self.prefix)
254 ident='%s.pyin'%self.prefix)
256 started = datetime.now().strftime(ISO8601)
255 started = datetime.now()
257 try:
256 try:
258 comp_code = self.compiler(code, '<zmq-kernel>')
257 comp_code = self.compiler(code, '<zmq-kernel>')
259 # allow for not overriding displayhook
258 # allow for not overriding displayhook
260 if hasattr(sys.displayhook, 'set_parent'):
259 if hasattr(sys.displayhook, 'set_parent'):
261 sys.displayhook.set_parent(parent)
260 sys.displayhook.set_parent(parent)
262 sys.stdout.set_parent(parent)
261 sys.stdout.set_parent(parent)
263 sys.stderr.set_parent(parent)
262 sys.stderr.set_parent(parent)
264 exec comp_code in self.user_ns, self.user_ns
263 exec comp_code in self.user_ns, self.user_ns
265 except:
264 except:
266 exc_content = self._wrap_exception('execute')
265 exc_content = self._wrap_exception('execute')
267 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
266 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
268 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
267 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
269 ident='%s.pyerr'%self.prefix)
268 ident='%s.pyerr'%self.prefix)
270 reply_content = exc_content
269 reply_content = exc_content
271 else:
270 else:
272 reply_content = {'status' : 'ok'}
271 reply_content = {'status' : 'ok'}
273
272
274 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
273 reply_msg = self.session.send(stream, u'execute_reply', reply_content, parent=parent,
275 ident=ident, subheader = dict(started=started))
274 ident=ident, subheader = dict(started=started))
276 self.log.debug(str(reply_msg))
275 self.log.debug(str(reply_msg))
277 if reply_msg['content']['status'] == u'error':
276 if reply_msg['content']['status'] == u'error':
278 self.abort_queues()
277 self.abort_queues()
279
278
280 def complete_request(self, stream, ident, parent):
279 def complete_request(self, stream, ident, parent):
281 matches = {'matches' : self.complete(parent),
280 matches = {'matches' : self.complete(parent),
282 'status' : 'ok'}
281 'status' : 'ok'}
283 completion_msg = self.session.send(stream, 'complete_reply',
282 completion_msg = self.session.send(stream, 'complete_reply',
284 matches, parent, ident)
283 matches, parent, ident)
285 # print >> sys.__stdout__, completion_msg
284 # print >> sys.__stdout__, completion_msg
286
285
287 def complete(self, msg):
286 def complete(self, msg):
288 return self.completer.complete(msg.content.line, msg.content.text)
287 return self.completer.complete(msg.content.line, msg.content.text)
289
288
290 def apply_request(self, stream, ident, parent):
289 def apply_request(self, stream, ident, parent):
291 # flush previous reply, so this request won't block it
290 # flush previous reply, so this request won't block it
292 stream.flush(zmq.POLLOUT)
291 stream.flush(zmq.POLLOUT)
293
292
294 try:
293 try:
295 content = parent[u'content']
294 content = parent[u'content']
296 bufs = parent[u'buffers']
295 bufs = parent[u'buffers']
297 msg_id = parent['header']['msg_id']
296 msg_id = parent['header']['msg_id']
298 # bound = parent['header'].get('bound', False)
297 # bound = parent['header'].get('bound', False)
299 except:
298 except:
300 self.log.error("Got bad msg: %s"%parent, exc_info=True)
299 self.log.error("Got bad msg: %s"%parent, exc_info=True)
301 return
300 return
302 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
301 # pyin_msg = self.session.msg(u'pyin',{u'code':code}, parent=parent)
303 # self.iopub_stream.send(pyin_msg)
302 # self.iopub_stream.send(pyin_msg)
304 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
303 # self.session.send(self.iopub_stream, u'pyin', {u'code':code},parent=parent)
305 sub = {'dependencies_met' : True, 'engine' : self.ident,
304 sub = {'dependencies_met' : True, 'engine' : self.ident,
306 'started': datetime.now().strftime(ISO8601)}
305 'started': datetime.now()}
307 try:
306 try:
308 # allow for not overriding displayhook
307 # allow for not overriding displayhook
309 if hasattr(sys.displayhook, 'set_parent'):
308 if hasattr(sys.displayhook, 'set_parent'):
310 sys.displayhook.set_parent(parent)
309 sys.displayhook.set_parent(parent)
311 sys.stdout.set_parent(parent)
310 sys.stdout.set_parent(parent)
312 sys.stderr.set_parent(parent)
311 sys.stderr.set_parent(parent)
313 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
312 # exec "f(*args,**kwargs)" in self.user_ns, self.user_ns
314 working = self.user_ns
313 working = self.user_ns
315 # suffix =
314 # suffix =
316 prefix = "_"+str(msg_id).replace("-","")+"_"
315 prefix = "_"+str(msg_id).replace("-","")+"_"
317
316
318 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
317 f,args,kwargs = unpack_apply_message(bufs, working, copy=False)
319 # if bound:
318 # if bound:
320 # bound_ns = Namespace(working)
319 # bound_ns = Namespace(working)
321 # args = [bound_ns]+list(args)
320 # args = [bound_ns]+list(args)
322
321
323 fname = getattr(f, '__name__', 'f')
322 fname = getattr(f, '__name__', 'f')
324
323
325 fname = prefix+"f"
324 fname = prefix+"f"
326 argname = prefix+"args"
325 argname = prefix+"args"
327 kwargname = prefix+"kwargs"
326 kwargname = prefix+"kwargs"
328 resultname = prefix+"result"
327 resultname = prefix+"result"
329
328
330 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
329 ns = { fname : f, argname : args, kwargname : kwargs , resultname : None }
331 # print ns
330 # print ns
332 working.update(ns)
331 working.update(ns)
333 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
332 code = "%s=%s(*%s,**%s)"%(resultname, fname, argname, kwargname)
334 try:
333 try:
335 exec code in working,working
334 exec code in working,working
336 result = working.get(resultname)
335 result = working.get(resultname)
337 finally:
336 finally:
338 for key in ns.iterkeys():
337 for key in ns.iterkeys():
339 working.pop(key)
338 working.pop(key)
340 # if bound:
339 # if bound:
341 # working.update(bound_ns)
340 # working.update(bound_ns)
342
341
343 packed_result,buf = serialize_object(result)
342 packed_result,buf = serialize_object(result)
344 result_buf = [packed_result]+buf
343 result_buf = [packed_result]+buf
345 except:
344 except:
346 exc_content = self._wrap_exception('apply')
345 exc_content = self._wrap_exception('apply')
347 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
346 # exc_msg = self.session.msg(u'pyerr', exc_content, parent)
348 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
347 self.session.send(self.iopub_stream, u'pyerr', exc_content, parent=parent,
349 ident='%s.pyerr'%self.prefix)
348 ident='%s.pyerr'%self.prefix)
350 reply_content = exc_content
349 reply_content = exc_content
351 result_buf = []
350 result_buf = []
352
351
353 if exc_content['ename'] == 'UnmetDependency':
352 if exc_content['ename'] == 'UnmetDependency':
354 sub['dependencies_met'] = False
353 sub['dependencies_met'] = False
355 else:
354 else:
356 reply_content = {'status' : 'ok'}
355 reply_content = {'status' : 'ok'}
357
356
358 # put 'ok'/'error' status in header, for scheduler introspection:
357 # put 'ok'/'error' status in header, for scheduler introspection:
359 sub['status'] = reply_content['status']
358 sub['status'] = reply_content['status']
360
359
361 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
360 reply_msg = self.session.send(stream, u'apply_reply', reply_content,
362 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
361 parent=parent, ident=ident,buffers=result_buf, subheader=sub)
363
362
364 # flush i/o
363 # flush i/o
365 # should this be before reply_msg is sent, like in the single-kernel code,
364 # should this be before reply_msg is sent, like in the single-kernel code,
366 # or should nothing get in the way of real results?
365 # or should nothing get in the way of real results?
367 sys.stdout.flush()
366 sys.stdout.flush()
368 sys.stderr.flush()
367 sys.stderr.flush()
369
368
370 def dispatch_queue(self, stream, msg):
369 def dispatch_queue(self, stream, msg):
371 self.control_stream.flush()
370 self.control_stream.flush()
372 idents,msg = self.session.feed_identities(msg, copy=False)
371 idents,msg = self.session.feed_identities(msg, copy=False)
373 try:
372 try:
374 msg = self.session.unpack_message(msg, content=True, copy=False)
373 msg = self.session.unpack_message(msg, content=True, copy=False)
375 except:
374 except:
376 self.log.error("Invalid Message", exc_info=True)
375 self.log.error("Invalid Message", exc_info=True)
377 return
376 return
378
377
379
378
380 header = msg['header']
379 header = msg['header']
381 msg_id = header['msg_id']
380 msg_id = header['msg_id']
382 if self.check_aborted(msg_id):
381 if self.check_aborted(msg_id):
383 self.aborted.remove(msg_id)
382 self.aborted.remove(msg_id)
384 # is it safe to assume a msg_id will not be resubmitted?
383 # is it safe to assume a msg_id will not be resubmitted?
385 reply_type = msg['msg_type'].split('_')[0] + '_reply'
384 reply_type = msg['msg_type'].split('_')[0] + '_reply'
386 status = {'status' : 'aborted'}
385 status = {'status' : 'aborted'}
387 reply_msg = self.session.send(stream, reply_type, subheader=status,
386 reply_msg = self.session.send(stream, reply_type, subheader=status,
388 content=status, parent=msg, ident=idents)
387 content=status, parent=msg, ident=idents)
389 return
388 return
390 handler = self.shell_handlers.get(msg['msg_type'], None)
389 handler = self.shell_handlers.get(msg['msg_type'], None)
391 if handler is None:
390 if handler is None:
392 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
391 self.log.error("UNKNOWN MESSAGE TYPE: %r"%msg['msg_type'])
393 else:
392 else:
394 handler(stream, idents, msg)
393 handler(stream, idents, msg)
395
394
396 def start(self):
395 def start(self):
397 #### stream mode:
396 #### stream mode:
398 if self.control_stream:
397 if self.control_stream:
399 self.control_stream.on_recv(self.dispatch_control, copy=False)
398 self.control_stream.on_recv(self.dispatch_control, copy=False)
400 self.control_stream.on_err(printer)
399 self.control_stream.on_err(printer)
401
400
402 def make_dispatcher(stream):
401 def make_dispatcher(stream):
403 def dispatcher(msg):
402 def dispatcher(msg):
404 return self.dispatch_queue(stream, msg)
403 return self.dispatch_queue(stream, msg)
405 return dispatcher
404 return dispatcher
406
405
407 for s in self.shell_streams:
406 for s in self.shell_streams:
408 s.on_recv(make_dispatcher(s), copy=False)
407 s.on_recv(make_dispatcher(s), copy=False)
409 s.on_err(printer)
408 s.on_err(printer)
410
409
411 if self.iopub_stream:
410 if self.iopub_stream:
412 self.iopub_stream.on_err(printer)
411 self.iopub_stream.on_err(printer)
413
412
414 #### while True mode:
413 #### while True mode:
415 # while True:
414 # while True:
416 # idle = True
415 # idle = True
417 # try:
416 # try:
418 # msg = self.shell_stream.socket.recv_multipart(
417 # msg = self.shell_stream.socket.recv_multipart(
419 # zmq.NOBLOCK, copy=False)
418 # zmq.NOBLOCK, copy=False)
420 # except zmq.ZMQError, e:
419 # except zmq.ZMQError, e:
421 # if e.errno != zmq.EAGAIN:
420 # if e.errno != zmq.EAGAIN:
422 # raise e
421 # raise e
423 # else:
422 # else:
424 # idle=False
423 # idle=False
425 # self.dispatch_queue(self.shell_stream, msg)
424 # self.dispatch_queue(self.shell_stream, msg)
426 #
425 #
427 # if not self.task_stream.empty():
426 # if not self.task_stream.empty():
428 # idle=False
427 # idle=False
429 # msg = self.task_stream.recv_multipart()
428 # msg = self.task_stream.recv_multipart()
430 # self.dispatch_queue(self.task_stream, msg)
429 # self.dispatch_queue(self.task_stream, msg)
431 # if idle:
430 # if idle:
432 # # don't busywait
431 # # don't busywait
433 # time.sleep(1e-3)
432 # time.sleep(1e-3)
434
433
@@ -1,121 +1,132 b''
1 """Utilities to manipulate JSON objects.
1 """Utilities to manipulate JSON objects.
2 """
2 """
3 #-----------------------------------------------------------------------------
3 #-----------------------------------------------------------------------------
4 # Copyright (C) 2010 The IPython Development Team
4 # Copyright (C) 2010 The IPython Development Team
5 #
5 #
6 # Distributed under the terms of the BSD License. The full license is in
6 # Distributed under the terms of the BSD License. The full license is in
7 # the file COPYING.txt, distributed as part of this software.
7 # the file COPYING.txt, distributed as part of this software.
8 #-----------------------------------------------------------------------------
8 #-----------------------------------------------------------------------------
9
9
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11 # Imports
11 # Imports
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # stdlib
13 # stdlib
14 import re
14 import re
15 import types
15 import types
16 from datetime import datetime
16 from datetime import datetime
17
17
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19 # Globals and constants
19 # Globals and constants
20 #-----------------------------------------------------------------------------
20 #-----------------------------------------------------------------------------
21
21
22 # timestamp formats
22 # timestamp formats
23 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
23 ISO8601="%Y-%m-%dT%H:%M:%S.%f"
24 ISO8601_PAT=re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+$")
24 ISO8601_PAT=re.compile(r"^\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d+$")
25
25
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27 # Classes and functions
27 # Classes and functions
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29
29
30 def extract_dates(obj):
30 def extract_dates(obj):
31 """extract ISO8601 dates from unpacked JSON"""
31 """extract ISO8601 dates from unpacked JSON"""
32 if isinstance(obj, dict):
32 if isinstance(obj, dict):
33 for k,v in obj.iteritems():
33 for k,v in obj.iteritems():
34 obj[k] = extract_dates(v)
34 obj[k] = extract_dates(v)
35 elif isinstance(obj, list):
35 elif isinstance(obj, (list, tuple)):
36 obj = [ extract_dates(o) for o in obj ]
36 obj = [ extract_dates(o) for o in obj ]
37 elif isinstance(obj, basestring):
37 elif isinstance(obj, basestring):
38 if ISO8601_PAT.match(obj):
38 if ISO8601_PAT.match(obj):
39 obj = datetime.strptime(obj, ISO8601)
39 obj = datetime.strptime(obj, ISO8601)
40 return obj
40 return obj
41
41
42 def squash_dates(obj):
43 """squash datetime objects into ISO8601 strings"""
44 if isinstance(obj, dict):
45 for k,v in obj.iteritems():
46 obj[k] = squash_dates(v)
47 elif isinstance(obj, (list, tuple)):
48 obj = [ squash_dates(o) for o in obj ]
49 elif isinstance(obj, datetime):
50 obj = obj.strftime(ISO8601)
51 return obj
52
42 def date_default(obj):
53 def date_default(obj):
43 """default function for packing datetime objects"""
54 """default function for packing datetime objects in JSON."""
44 if isinstance(obj, datetime):
55 if isinstance(obj, datetime):
45 return obj.strftime(ISO8601)
56 return obj.strftime(ISO8601)
46 else:
57 else:
47 raise TypeError("%r is not JSON serializable"%obj)
58 raise TypeError("%r is not JSON serializable"%obj)
48
59
49
60
50
61
51 def json_clean(obj):
62 def json_clean(obj):
52 """Clean an object to ensure it's safe to encode in JSON.
63 """Clean an object to ensure it's safe to encode in JSON.
53
64
54 Atomic, immutable objects are returned unmodified. Sets and tuples are
65 Atomic, immutable objects are returned unmodified. Sets and tuples are
55 converted to lists, lists are copied and dicts are also copied.
66 converted to lists, lists are copied and dicts are also copied.
56
67
57 Note: dicts whose keys could cause collisions upon encoding (such as a dict
68 Note: dicts whose keys could cause collisions upon encoding (such as a dict
58 with both the number 1 and the string '1' as keys) will cause a ValueError
69 with both the number 1 and the string '1' as keys) will cause a ValueError
59 to be raised.
70 to be raised.
60
71
61 Parameters
72 Parameters
62 ----------
73 ----------
63 obj : any python object
74 obj : any python object
64
75
65 Returns
76 Returns
66 -------
77 -------
67 out : object
78 out : object
68
79
69 A version of the input which will not cause an encoding error when
80 A version of the input which will not cause an encoding error when
70 encoded as JSON. Note that this function does not *encode* its inputs,
81 encoded as JSON. Note that this function does not *encode* its inputs,
71 it simply sanitizes it so that there will be no encoding errors later.
82 it simply sanitizes it so that there will be no encoding errors later.
72
83
73 Examples
84 Examples
74 --------
85 --------
75 >>> json_clean(4)
86 >>> json_clean(4)
76 4
87 4
77 >>> json_clean(range(10))
88 >>> json_clean(range(10))
78 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
89 [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
79 >>> json_clean(dict(x=1, y=2))
90 >>> json_clean(dict(x=1, y=2))
80 {'y': 2, 'x': 1}
91 {'y': 2, 'x': 1}
81 >>> json_clean(dict(x=1, y=2, z=[1,2,3]))
92 >>> json_clean(dict(x=1, y=2, z=[1,2,3]))
82 {'y': 2, 'x': 1, 'z': [1, 2, 3]}
93 {'y': 2, 'x': 1, 'z': [1, 2, 3]}
83 >>> json_clean(True)
94 >>> json_clean(True)
84 True
95 True
85 """
96 """
86 # types that are 'atomic' and ok in json as-is. bool doesn't need to be
97 # types that are 'atomic' and ok in json as-is. bool doesn't need to be
87 # listed explicitly because bools pass as int instances
98 # listed explicitly because bools pass as int instances
88 atomic_ok = (basestring, int, float, types.NoneType)
99 atomic_ok = (basestring, int, float, types.NoneType)
89
100
90 # containers that we need to convert into lists
101 # containers that we need to convert into lists
91 container_to_list = (tuple, set, types.GeneratorType)
102 container_to_list = (tuple, set, types.GeneratorType)
92
103
93 if isinstance(obj, atomic_ok):
104 if isinstance(obj, atomic_ok):
94 return obj
105 return obj
95
106
96 if isinstance(obj, container_to_list) or (
107 if isinstance(obj, container_to_list) or (
97 hasattr(obj, '__iter__') and hasattr(obj, 'next')):
108 hasattr(obj, '__iter__') and hasattr(obj, 'next')):
98 obj = list(obj)
109 obj = list(obj)
99
110
100 if isinstance(obj, list):
111 if isinstance(obj, list):
101 return [json_clean(x) for x in obj]
112 return [json_clean(x) for x in obj]
102
113
103 if isinstance(obj, dict):
114 if isinstance(obj, dict):
104 # First, validate that the dict won't lose data in conversion due to
115 # First, validate that the dict won't lose data in conversion due to
105 # key collisions after stringification. This can happen with keys like
116 # key collisions after stringification. This can happen with keys like
106 # True and 'true' or 1 and '1', which collide in JSON.
117 # True and 'true' or 1 and '1', which collide in JSON.
107 nkeys = len(obj)
118 nkeys = len(obj)
108 nkeys_collapsed = len(set(map(str, obj)))
119 nkeys_collapsed = len(set(map(str, obj)))
109 if nkeys != nkeys_collapsed:
120 if nkeys != nkeys_collapsed:
110 raise ValueError('dict can not be safely converted to JSON: '
121 raise ValueError('dict can not be safely converted to JSON: '
111 'key collision would lead to dropped values')
122 'key collision would lead to dropped values')
112 # If all OK, proceed by making the new dict that will be json-safe
123 # If all OK, proceed by making the new dict that will be json-safe
113 out = {}
124 out = {}
114 for k,v in obj.iteritems():
125 for k,v in obj.iteritems():
115 out[str(k)] = json_clean(v)
126 out[str(k)] = json_clean(v)
116 return out
127 return out
117
128
118 # If we get here, we don't know how to handle the object, so we just get
129 # If we get here, we don't know how to handle the object, so we just get
119 # its repr and return that. This will catch lambdas, open sockets, class
130 # its repr and return that. This will catch lambdas, open sockets, class
120 # objects, and any other complicated contraption that json can't encode
131 # objects, and any other complicated contraption that json can't encode
121 return repr(obj)
132 return repr(obj)
@@ -1,511 +1,535 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """edited session.py to work with streams, and move msg_type to the header
2 """edited session.py to work with streams, and move msg_type to the header
3 """
3 """
4 #-----------------------------------------------------------------------------
4 #-----------------------------------------------------------------------------
5 # Copyright (C) 2010-2011 The IPython Development Team
5 # Copyright (C) 2010-2011 The IPython Development Team
6 #
6 #
7 # Distributed under the terms of the BSD License. The full license is in
7 # Distributed under the terms of the BSD License. The full license is in
8 # the file COPYING, distributed as part of this software.
8 # the file COPYING, distributed as part of this software.
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10
10
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12 # Imports
12 # Imports
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14
14
15 import hmac
15 import hmac
16 import logging
16 import logging
17 import os
17 import os
18 import pprint
18 import pprint
19 import uuid
19 import uuid
20 from datetime import datetime
20 from datetime import datetime
21
21
22 try:
22 try:
23 import cPickle
23 import cPickle
24 pickle = cPickle
24 pickle = cPickle
25 except:
25 except:
26 cPickle = None
26 cPickle = None
27 import pickle
27 import pickle
28
28
29 import zmq
29 import zmq
30 from zmq.utils import jsonapi
30 from zmq.utils import jsonapi
31 from zmq.eventloop.ioloop import IOLoop
31 from zmq.eventloop.ioloop import IOLoop
32 from zmq.eventloop.zmqstream import ZMQStream
32 from zmq.eventloop.zmqstream import ZMQStream
33
33
34 from IPython.config.configurable import Configurable
34 from IPython.config.configurable import Configurable
35 from IPython.utils.importstring import import_item
35 from IPython.utils.importstring import import_item
36 from IPython.utils.jsonutil import date_default
36 from IPython.utils.jsonutil import extract_dates, squash_dates, date_default
37 from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set
37 from IPython.utils.traitlets import CStr, Unicode, Bool, Any, Instance, Set
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # utility functions
40 # utility functions
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43 def squash_unicode(obj):
43 def squash_unicode(obj):
44 """coerce unicode back to bytestrings."""
44 """coerce unicode back to bytestrings."""
45 if isinstance(obj,dict):
45 if isinstance(obj,dict):
46 for key in obj.keys():
46 for key in obj.keys():
47 obj[key] = squash_unicode(obj[key])
47 obj[key] = squash_unicode(obj[key])
48 if isinstance(key, unicode):
48 if isinstance(key, unicode):
49 obj[squash_unicode(key)] = obj.pop(key)
49 obj[squash_unicode(key)] = obj.pop(key)
50 elif isinstance(obj, list):
50 elif isinstance(obj, list):
51 for i,v in enumerate(obj):
51 for i,v in enumerate(obj):
52 obj[i] = squash_unicode(v)
52 obj[i] = squash_unicode(v)
53 elif isinstance(obj, unicode):
53 elif isinstance(obj, unicode):
54 obj = obj.encode('utf8')
54 obj = obj.encode('utf8')
55 return obj
55 return obj
56
56
57 #-----------------------------------------------------------------------------
57 #-----------------------------------------------------------------------------
58 # globals and defaults
58 # globals and defaults
59 #-----------------------------------------------------------------------------
59 #-----------------------------------------------------------------------------
60
60 key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
61 _default_key = 'on_unknown' if jsonapi.jsonmod.__name__ == 'jsonlib' else 'default'
61 json_packer = lambda obj: jsonapi.dumps(obj, **{key:date_default})
62 json_packer = lambda obj: jsonapi.dumps(obj, **{_default_key:date_default})
62 json_unpacker = lambda s: squash_unicode(extract_dates(jsonapi.loads(s)))
63 json_unpacker = lambda s: squash_unicode(jsonapi.loads(s))
64
63
65 pickle_packer = lambda o: pickle.dumps(o,-1)
64 pickle_packer = lambda o: pickle.dumps(o,-1)
66 pickle_unpacker = pickle.loads
65 pickle_unpacker = pickle.loads
67
66
68 default_packer = json_packer
67 default_packer = json_packer
69 default_unpacker = json_unpacker
68 default_unpacker = json_unpacker
70
69
71
70
72 DELIM="<IDS|MSG>"
71 DELIM="<IDS|MSG>"
73
72
74 #-----------------------------------------------------------------------------
73 #-----------------------------------------------------------------------------
75 # Classes
74 # Classes
76 #-----------------------------------------------------------------------------
75 #-----------------------------------------------------------------------------
77
76
78 class SessionFactory(Configurable):
77 class SessionFactory(Configurable):
79 """The Base class for configurables that have a Session, Context, logger,
78 """The Base class for configurables that have a Session, Context, logger,
80 and IOLoop.
79 and IOLoop.
81 """
80 """
82
81
83 log = Instance('logging.Logger', ('', logging.WARN))
82 log = Instance('logging.Logger', ('', logging.WARN))
84
83
85 logname = Unicode('')
84 logname = Unicode('')
86 def _logname_changed(self, name, old, new):
85 def _logname_changed(self, name, old, new):
87 self.log = logging.getLogger(new)
86 self.log = logging.getLogger(new)
88
87
89 # not configurable:
88 # not configurable:
90 context = Instance('zmq.Context')
89 context = Instance('zmq.Context')
91 def _context_default(self):
90 def _context_default(self):
92 return zmq.Context.instance()
91 return zmq.Context.instance()
93
92
94 session = Instance('IPython.zmq.session.Session')
93 session = Instance('IPython.zmq.session.Session')
95
94
96 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
95 loop = Instance('zmq.eventloop.ioloop.IOLoop', allow_none=False)
97 def _loop_default(self):
96 def _loop_default(self):
98 return IOLoop.instance()
97 return IOLoop.instance()
99
98
100 def __init__(self, **kwargs):
99 def __init__(self, **kwargs):
101 super(SessionFactory, self).__init__(**kwargs)
100 super(SessionFactory, self).__init__(**kwargs)
102
101
103 if self.session is None:
102 if self.session is None:
104 # construct the session
103 # construct the session
105 self.session = Session(**kwargs)
104 self.session = Session(**kwargs)
106
105
107
106
108 class Message(object):
107 class Message(object):
109 """A simple message object that maps dict keys to attributes.
108 """A simple message object that maps dict keys to attributes.
110
109
111 A Message can be created from a dict and a dict from a Message instance
110 A Message can be created from a dict and a dict from a Message instance
112 simply by calling dict(msg_obj)."""
111 simply by calling dict(msg_obj)."""
113
112
114 def __init__(self, msg_dict):
113 def __init__(self, msg_dict):
115 dct = self.__dict__
114 dct = self.__dict__
116 for k, v in dict(msg_dict).iteritems():
115 for k, v in dict(msg_dict).iteritems():
117 if isinstance(v, dict):
116 if isinstance(v, dict):
118 v = Message(v)
117 v = Message(v)
119 dct[k] = v
118 dct[k] = v
120
119
121 # Having this iterator lets dict(msg_obj) work out of the box.
120 # Having this iterator lets dict(msg_obj) work out of the box.
122 def __iter__(self):
121 def __iter__(self):
123 return iter(self.__dict__.iteritems())
122 return iter(self.__dict__.iteritems())
124
123
125 def __repr__(self):
124 def __repr__(self):
126 return repr(self.__dict__)
125 return repr(self.__dict__)
127
126
128 def __str__(self):
127 def __str__(self):
129 return pprint.pformat(self.__dict__)
128 return pprint.pformat(self.__dict__)
130
129
131 def __contains__(self, k):
130 def __contains__(self, k):
132 return k in self.__dict__
131 return k in self.__dict__
133
132
134 def __getitem__(self, k):
133 def __getitem__(self, k):
135 return self.__dict__[k]
134 return self.__dict__[k]
136
135
137
136
138 def msg_header(msg_id, msg_type, username, session):
137 def msg_header(msg_id, msg_type, username, session):
139 date=datetime.now()
138 date = datetime.now()
140 return locals()
139 return locals()
141
140
142 def extract_header(msg_or_header):
141 def extract_header(msg_or_header):
143 """Given a message or header, return the header."""
142 """Given a message or header, return the header."""
144 if not msg_or_header:
143 if not msg_or_header:
145 return {}
144 return {}
146 try:
145 try:
147 # See if msg_or_header is the entire message.
146 # See if msg_or_header is the entire message.
148 h = msg_or_header['header']
147 h = msg_or_header['header']
149 except KeyError:
148 except KeyError:
150 try:
149 try:
151 # See if msg_or_header is just the header
150 # See if msg_or_header is just the header
152 h = msg_or_header['msg_id']
151 h = msg_or_header['msg_id']
153 except KeyError:
152 except KeyError:
154 raise
153 raise
155 else:
154 else:
156 h = msg_or_header
155 h = msg_or_header
157 if not isinstance(h, dict):
156 if not isinstance(h, dict):
158 h = dict(h)
157 h = dict(h)
159 return h
158 return h
160
159
161 class Session(Configurable):
160 class Session(Configurable):
162 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
161 """tweaked version of IPython.zmq.session.Session, for development in Parallel"""
163 debug=Bool(False, config=True, help="""Debug output in the Session""")
162 debug=Bool(False, config=True, help="""Debug output in the Session""")
164 packer = Unicode('json',config=True,
163 packer = Unicode('json',config=True,
165 help="""The name of the packer for serializing messages.
164 help="""The name of the packer for serializing messages.
166 Should be one of 'json', 'pickle', or an import name
165 Should be one of 'json', 'pickle', or an import name
167 for a custom serializer.""")
166 for a custom callable serializer.""")
168 def _packer_changed(self, name, old, new):
167 def _packer_changed(self, name, old, new):
169 if new.lower() == 'json':
168 if new.lower() == 'json':
170 self.pack = json_packer
169 self.pack = json_packer
171 self.unpack = json_unpacker
170 self.unpack = json_unpacker
172 elif new.lower() == 'pickle':
171 elif new.lower() == 'pickle':
173 self.pack = pickle_packer
172 self.pack = pickle_packer
174 self.unpack = pickle_unpacker
173 self.unpack = pickle_unpacker
175 else:
174 else:
176 self.pack = import_item(new)
175 self.pack = import_item(str(new))
177
176
178 unpacker = Unicode('json', config=True,
177 unpacker = Unicode('json', config=True,
179 help="""The name of the unpacker for unserializing messages.
178 help="""The name of the unpacker for unserializing messages.
180 Only used with custom functions for `packer`.""")
179 Only used with custom functions for `packer`.""")
181 def _unpacker_changed(self, name, old, new):
180 def _unpacker_changed(self, name, old, new):
182 if new.lower() == 'json':
181 if new.lower() == 'json':
183 self.pack = json_packer
182 self.pack = json_packer
184 self.unpack = json_unpacker
183 self.unpack = json_unpacker
185 elif new.lower() == 'pickle':
184 elif new.lower() == 'pickle':
186 self.pack = pickle_packer
185 self.pack = pickle_packer
187 self.unpack = pickle_unpacker
186 self.unpack = pickle_unpacker
188 else:
187 else:
189 self.unpack = import_item(new)
188 self.unpack = import_item(str(new))
190
189
191 session = CStr('', config=True,
190 session = CStr('', config=True,
192 help="""The UUID identifying this session.""")
191 help="""The UUID identifying this session.""")
193 def _session_default(self):
192 def _session_default(self):
194 return bytes(uuid.uuid4())
193 return bytes(uuid.uuid4())
195 username = Unicode(os.environ.get('USER','username'), config=True,
194 username = Unicode(os.environ.get('USER','username'), config=True,
196 help="""Username for the Session. Default is your system username.""")
195 help="""Username for the Session. Default is your system username.""")
197
196
198 # message signature related traits:
197 # message signature related traits:
199 key = CStr('', config=True,
198 key = CStr('', config=True,
200 help="""execution key, for extra authentication.""")
199 help="""execution key, for extra authentication.""")
201 def _key_changed(self, name, old, new):
200 def _key_changed(self, name, old, new):
202 if new:
201 if new:
203 self.auth = hmac.HMAC(new)
202 self.auth = hmac.HMAC(new)
204 else:
203 else:
205 self.auth = None
204 self.auth = None
206 auth = Instance(hmac.HMAC)
205 auth = Instance(hmac.HMAC)
207 counters = Instance('collections.defaultdict', (int,))
206 counters = Instance('collections.defaultdict', (int,))
208 digest_history = Set()
207 digest_history = Set()
209
208
210 keyfile = Unicode('', config=True,
209 keyfile = Unicode('', config=True,
211 help="""path to file containing execution key.""")
210 help="""path to file containing execution key.""")
212 def _keyfile_changed(self, name, old, new):
211 def _keyfile_changed(self, name, old, new):
213 with open(new, 'rb') as f:
212 with open(new, 'rb') as f:
214 self.key = f.read().strip()
213 self.key = f.read().strip()
215
214
216 pack = Any(default_packer) # the actual packer function
215 pack = Any(default_packer) # the actual packer function
217 def _pack_changed(self, name, old, new):
216 def _pack_changed(self, name, old, new):
218 if not callable(new):
217 if not callable(new):
219 raise TypeError("packer must be callable, not %s"%type(new))
218 raise TypeError("packer must be callable, not %s"%type(new))
220
219
221 unpack = Any(default_unpacker) # the actual packer function
220 unpack = Any(default_unpacker) # the actual packer function
222 def _unpack_changed(self, name, old, new):
221 def _unpack_changed(self, name, old, new):
222 # unpacker is not checked - it is assumed to be
223 if not callable(new):
223 if not callable(new):
224 raise TypeError("packer must be callable, not %s"%type(new))
224 raise TypeError("unpacker must be callable, not %s"%type(new))
225
225
226 def __init__(self, **kwargs):
226 def __init__(self, **kwargs):
227 super(Session, self).__init__(**kwargs)
227 super(Session, self).__init__(**kwargs)
228 self._check_packers()
228 self.none = self.pack({})
229 self.none = self.pack({})
229
230
230 @property
231 @property
231 def msg_id(self):
232 def msg_id(self):
232 """always return new uuid"""
233 """always return new uuid"""
233 return str(uuid.uuid4())
234 return str(uuid.uuid4())
234
235
236 def _check_packers(self):
237 """check packers for binary data and datetime support."""
238 pack = self.pack
239 unpack = self.unpack
240
241 # check simple serialization
242 msg = dict(a=[1,'hi'])
243 try:
244 packed = pack(msg)
245 except Exception:
246 raise ValueError("packer could not serialize a simple message")
247
248 # ensure packed message is bytes
249 if not isinstance(packed, bytes):
250 raise ValueError("message packed to %r, but bytes are required"%type(packed))
251
252 # check that unpack is pack's inverse
253 try:
254 unpacked = unpack(packed)
255 except Exception:
256 raise ValueError("unpacker could not handle the packer's output")
257
258 # check datetime support
259 msg = dict(t=datetime.now())
260 try:
261 unpacked = unpack(pack(msg))
262 except Exception:
263 self.pack = lambda o: pack(squash_dates(o))
264 self.unpack = lambda s: extract_dates(unpack(s))
265
235 def msg_header(self, msg_type):
266 def msg_header(self, msg_type):
236 return msg_header(self.msg_id, msg_type, self.username, self.session)
267 return msg_header(self.msg_id, msg_type, self.username, self.session)
237
268
238 def msg(self, msg_type, content=None, parent=None, subheader=None):
269 def msg(self, msg_type, content=None, parent=None, subheader=None):
239 msg = {}
270 msg = {}
240 msg['header'] = self.msg_header(msg_type)
271 msg['header'] = self.msg_header(msg_type)
241 msg['msg_id'] = msg['header']['msg_id']
272 msg['msg_id'] = msg['header']['msg_id']
242 msg['parent_header'] = {} if parent is None else extract_header(parent)
273 msg['parent_header'] = {} if parent is None else extract_header(parent)
243 msg['msg_type'] = msg_type
274 msg['msg_type'] = msg_type
244 msg['content'] = {} if content is None else content
275 msg['content'] = {} if content is None else content
245 sub = {} if subheader is None else subheader
276 sub = {} if subheader is None else subheader
246 msg['header'].update(sub)
277 msg['header'].update(sub)
247 return msg
278 return msg
248
279
249 def check_key(self, msg_or_header):
250 """Check that a message's header has the right key"""
251 if not self.key:
252 return True
253 header = extract_header(msg_or_header)
254 return header.get('key', '') == self.key
255
256 def sign(self, msg):
280 def sign(self, msg):
257 """Sign a message with HMAC digest. If no auth, return b''."""
281 """Sign a message with HMAC digest. If no auth, return b''."""
258 if self.auth is None:
282 if self.auth is None:
259 return b''
283 return b''
260 h = self.auth.copy()
284 h = self.auth.copy()
261 for m in msg:
285 for m in msg:
262 h.update(m)
286 h.update(m)
263 return h.hexdigest()
287 return h.hexdigest()
264
288
265 def serialize(self, msg, ident=None):
289 def serialize(self, msg, ident=None):
266 content = msg.get('content', {})
290 content = msg.get('content', {})
267 if content is None:
291 if content is None:
268 content = self.none
292 content = self.none
269 elif isinstance(content, dict):
293 elif isinstance(content, dict):
270 content = self.pack(content)
294 content = self.pack(content)
271 elif isinstance(content, bytes):
295 elif isinstance(content, bytes):
272 # content is already packed, as in a relayed message
296 # content is already packed, as in a relayed message
273 pass
297 pass
274 elif isinstance(content, unicode):
298 elif isinstance(content, unicode):
275 # should be bytes, but JSON often spits out unicode
299 # should be bytes, but JSON often spits out unicode
276 content = content.encode('utf8')
300 content = content.encode('utf8')
277 else:
301 else:
278 raise TypeError("Content incorrect type: %s"%type(content))
302 raise TypeError("Content incorrect type: %s"%type(content))
279
303
280 real_message = [self.pack(msg['header']),
304 real_message = [self.pack(msg['header']),
281 self.pack(msg['parent_header']),
305 self.pack(msg['parent_header']),
282 content
306 content
283 ]
307 ]
284
308
285 to_send = []
309 to_send = []
286
310
287 if isinstance(ident, list):
311 if isinstance(ident, list):
288 # accept list of idents
312 # accept list of idents
289 to_send.extend(ident)
313 to_send.extend(ident)
290 elif ident is not None:
314 elif ident is not None:
291 to_send.append(ident)
315 to_send.append(ident)
292 to_send.append(DELIM)
316 to_send.append(DELIM)
293
317
294 signature = self.sign(real_message)
318 signature = self.sign(real_message)
295 to_send.append(signature)
319 to_send.append(signature)
296
320
297 to_send.extend(real_message)
321 to_send.extend(real_message)
298
322
299 return to_send
323 return to_send
300
324
301 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
325 def send(self, stream, msg_or_type, content=None, parent=None, ident=None,
302 buffers=None, subheader=None, track=False):
326 buffers=None, subheader=None, track=False):
303 """Build and send a message via stream or socket.
327 """Build and send a message via stream or socket.
304
328
305 Parameters
329 Parameters
306 ----------
330 ----------
307
331
308 stream : zmq.Socket or ZMQStream
332 stream : zmq.Socket or ZMQStream
309 the socket-like object used to send the data
333 the socket-like object used to send the data
310 msg_or_type : str or Message/dict
334 msg_or_type : str or Message/dict
311 Normally, msg_or_type will be a msg_type unless a message is being sent more
335 Normally, msg_or_type will be a msg_type unless a message is being sent more
312 than once.
336 than once.
313
337
314 content : dict or None
338 content : dict or None
315 the content of the message (ignored if msg_or_type is a message)
339 the content of the message (ignored if msg_or_type is a message)
316 parent : Message or dict or None
340 parent : Message or dict or None
317 the parent or parent header describing the parent of this message
341 the parent or parent header describing the parent of this message
318 ident : bytes or list of bytes
342 ident : bytes or list of bytes
319 the zmq.IDENTITY routing path
343 the zmq.IDENTITY routing path
320 subheader : dict or None
344 subheader : dict or None
321 extra header keys for this message's header
345 extra header keys for this message's header
322 buffers : list or None
346 buffers : list or None
323 the already-serialized buffers to be appended to the message
347 the already-serialized buffers to be appended to the message
324 track : bool
348 track : bool
325 whether to track. Only for use with Sockets,
349 whether to track. Only for use with Sockets,
326 because ZMQStream objects cannot track messages.
350 because ZMQStream objects cannot track messages.
327
351
328 Returns
352 Returns
329 -------
353 -------
330 msg : message dict
354 msg : message dict
331 the constructed message
355 the constructed message
332 (msg,tracker) : (message dict, MessageTracker)
356 (msg,tracker) : (message dict, MessageTracker)
333 if track=True, then a 2-tuple will be returned,
357 if track=True, then a 2-tuple will be returned,
334 the first element being the constructed
358 the first element being the constructed
335 message, and the second being the MessageTracker
359 message, and the second being the MessageTracker
336
360
337 """
361 """
338
362
339 if not isinstance(stream, (zmq.Socket, ZMQStream)):
363 if not isinstance(stream, (zmq.Socket, ZMQStream)):
340 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
364 raise TypeError("stream must be Socket or ZMQStream, not %r"%type(stream))
341 elif track and isinstance(stream, ZMQStream):
365 elif track and isinstance(stream, ZMQStream):
342 raise TypeError("ZMQStream cannot track messages")
366 raise TypeError("ZMQStream cannot track messages")
343
367
344 if isinstance(msg_or_type, (Message, dict)):
368 if isinstance(msg_or_type, (Message, dict)):
345 # we got a Message, not a msg_type
369 # we got a Message, not a msg_type
346 # don't build a new Message
370 # don't build a new Message
347 msg = msg_or_type
371 msg = msg_or_type
348 else:
372 else:
349 msg = self.msg(msg_or_type, content, parent, subheader)
373 msg = self.msg(msg_or_type, content, parent, subheader)
350
374
351 buffers = [] if buffers is None else buffers
375 buffers = [] if buffers is None else buffers
352 to_send = self.serialize(msg, ident)
376 to_send = self.serialize(msg, ident)
353 flag = 0
377 flag = 0
354 if buffers:
378 if buffers:
355 flag = zmq.SNDMORE
379 flag = zmq.SNDMORE
356 _track = False
380 _track = False
357 else:
381 else:
358 _track=track
382 _track=track
359 if track:
383 if track:
360 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
384 tracker = stream.send_multipart(to_send, flag, copy=False, track=_track)
361 else:
385 else:
362 tracker = stream.send_multipart(to_send, flag, copy=False)
386 tracker = stream.send_multipart(to_send, flag, copy=False)
363 for b in buffers[:-1]:
387 for b in buffers[:-1]:
364 stream.send(b, flag, copy=False)
388 stream.send(b, flag, copy=False)
365 if buffers:
389 if buffers:
366 if track:
390 if track:
367 tracker = stream.send(buffers[-1], copy=False, track=track)
391 tracker = stream.send(buffers[-1], copy=False, track=track)
368 else:
392 else:
369 tracker = stream.send(buffers[-1], copy=False)
393 tracker = stream.send(buffers[-1], copy=False)
370
394
371 # omsg = Message(msg)
395 # omsg = Message(msg)
372 if self.debug:
396 if self.debug:
373 pprint.pprint(msg)
397 pprint.pprint(msg)
374 pprint.pprint(to_send)
398 pprint.pprint(to_send)
375 pprint.pprint(buffers)
399 pprint.pprint(buffers)
376
400
377 msg['tracker'] = tracker
401 msg['tracker'] = tracker
378
402
379 return msg
403 return msg
380
404
381 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
405 def send_raw(self, stream, msg, flags=0, copy=True, ident=None):
382 """Send a raw message via ident path.
406 """Send a raw message via ident path.
383
407
384 Parameters
408 Parameters
385 ----------
409 ----------
386 msg : list of sendable buffers"""
410 msg : list of sendable buffers"""
387 to_send = []
411 to_send = []
388 if isinstance(ident, bytes):
412 if isinstance(ident, bytes):
389 ident = [ident]
413 ident = [ident]
390 if ident is not None:
414 if ident is not None:
391 to_send.extend(ident)
415 to_send.extend(ident)
392
416
393 to_send.append(DELIM)
417 to_send.append(DELIM)
394 to_send.append(self.sign(msg))
418 to_send.append(self.sign(msg))
395 to_send.extend(msg)
419 to_send.extend(msg)
396 stream.send_multipart(msg, flags, copy=copy)
420 stream.send_multipart(msg, flags, copy=copy)
397
421
398 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
422 def recv(self, socket, mode=zmq.NOBLOCK, content=True, copy=True):
399 """receives and unpacks a message
423 """receives and unpacks a message
400 returns [idents], msg"""
424 returns [idents], msg"""
401 if isinstance(socket, ZMQStream):
425 if isinstance(socket, ZMQStream):
402 socket = socket.socket
426 socket = socket.socket
403 try:
427 try:
404 msg = socket.recv_multipart(mode)
428 msg = socket.recv_multipart(mode)
405 except zmq.ZMQError as e:
429 except zmq.ZMQError as e:
406 if e.errno == zmq.EAGAIN:
430 if e.errno == zmq.EAGAIN:
407 # We can convert EAGAIN to None as we know in this case
431 # We can convert EAGAIN to None as we know in this case
408 # recv_multipart won't return None.
432 # recv_multipart won't return None.
409 return None,None
433 return None,None
410 else:
434 else:
411 raise
435 raise
412 # return an actual Message object
436 # return an actual Message object
413 # determine the number of idents by trying to unpack them.
437 # determine the number of idents by trying to unpack them.
414 # this is terrible:
438 # this is terrible:
415 idents, msg = self.feed_identities(msg, copy)
439 idents, msg = self.feed_identities(msg, copy)
416 try:
440 try:
417 return idents, self.unpack_message(msg, content=content, copy=copy)
441 return idents, self.unpack_message(msg, content=content, copy=copy)
418 except Exception as e:
442 except Exception as e:
419 print (idents, msg)
443 print (idents, msg)
420 # TODO: handle it
444 # TODO: handle it
421 raise e
445 raise e
422
446
423 def feed_identities(self, msg, copy=True):
447 def feed_identities(self, msg, copy=True):
424 """feed until DELIM is reached, then return the prefix as idents and remainder as
448 """feed until DELIM is reached, then return the prefix as idents and remainder as
425 msg. This is easily broken by setting an IDENT to DELIM, but that would be silly.
449 msg. This is easily broken by setting an IDENT to DELIM, but that would be silly.
426
450
427 Parameters
451 Parameters
428 ----------
452 ----------
429 msg : a list of Message or bytes objects
453 msg : a list of Message or bytes objects
430 the message to be split
454 the message to be split
431 copy : bool
455 copy : bool
432 flag determining whether the arguments are bytes or Messages
456 flag determining whether the arguments are bytes or Messages
433
457
434 Returns
458 Returns
435 -------
459 -------
436 (idents,msg) : two lists
460 (idents,msg) : two lists
437 idents will always be a list of bytes - the indentity prefix
461 idents will always be a list of bytes - the indentity prefix
438 msg will be a list of bytes or Messages, unchanged from input
462 msg will be a list of bytes or Messages, unchanged from input
439 msg should be unpackable via self.unpack_message at this point.
463 msg should be unpackable via self.unpack_message at this point.
440 """
464 """
441 if copy:
465 if copy:
442 idx = msg.index(DELIM)
466 idx = msg.index(DELIM)
443 return msg[:idx], msg[idx+1:]
467 return msg[:idx], msg[idx+1:]
444 else:
468 else:
445 failed = True
469 failed = True
446 for idx,m in enumerate(msg):
470 for idx,m in enumerate(msg):
447 if m.bytes == DELIM:
471 if m.bytes == DELIM:
448 failed = False
472 failed = False
449 break
473 break
450 if failed:
474 if failed:
451 raise ValueError("DELIM not in msg")
475 raise ValueError("DELIM not in msg")
452 idents, msg = msg[:idx], msg[idx+1:]
476 idents, msg = msg[:idx], msg[idx+1:]
453 return [m.bytes for m in idents], msg
477 return [m.bytes for m in idents], msg
454
478
455 def unpack_message(self, msg, content=True, copy=True):
479 def unpack_message(self, msg, content=True, copy=True):
456 """Return a message object from the format
480 """Return a message object from the format
457 sent by self.send.
481 sent by self.send.
458
482
459 Parameters:
483 Parameters:
460 -----------
484 -----------
461
485
462 content : bool (True)
486 content : bool (True)
463 whether to unpack the content dict (True),
487 whether to unpack the content dict (True),
464 or leave it serialized (False)
488 or leave it serialized (False)
465
489
466 copy : bool (True)
490 copy : bool (True)
467 whether to return the bytes (True),
491 whether to return the bytes (True),
468 or the non-copying Message object in each place (False)
492 or the non-copying Message object in each place (False)
469
493
470 """
494 """
471 minlen = 4
495 minlen = 4
472 message = {}
496 message = {}
473 if not copy:
497 if not copy:
474 for i in range(minlen):
498 for i in range(minlen):
475 msg[i] = msg[i].bytes
499 msg[i] = msg[i].bytes
476 if self.auth is not None:
500 if self.auth is not None:
477 signature = msg[0]
501 signature = msg[0]
478 if signature in self.digest_history:
502 if signature in self.digest_history:
479 raise ValueError("Duplicate Signature: %r"%signature)
503 raise ValueError("Duplicate Signature: %r"%signature)
480 self.digest_history.add(signature)
504 self.digest_history.add(signature)
481 check = self.sign(msg[1:4])
505 check = self.sign(msg[1:4])
482 if not signature == check:
506 if not signature == check:
483 raise ValueError("Invalid Signature: %r"%signature)
507 raise ValueError("Invalid Signature: %r"%signature)
484 if not len(msg) >= minlen:
508 if not len(msg) >= minlen:
485 raise TypeError("malformed message, must have at least %i elements"%minlen)
509 raise TypeError("malformed message, must have at least %i elements"%minlen)
486 message['header'] = self.unpack(msg[1])
510 message['header'] = self.unpack(msg[1])
487 message['msg_type'] = message['header']['msg_type']
511 message['msg_type'] = message['header']['msg_type']
488 message['parent_header'] = self.unpack(msg[2])
512 message['parent_header'] = self.unpack(msg[2])
489 if content:
513 if content:
490 message['content'] = self.unpack(msg[3])
514 message['content'] = self.unpack(msg[3])
491 else:
515 else:
492 message['content'] = msg[3]
516 message['content'] = msg[3]
493
517
494 message['buffers'] = msg[4:]
518 message['buffers'] = msg[4:]
495 return message
519 return message
496
520
497 def test_msg2obj():
521 def test_msg2obj():
498 am = dict(x=1)
522 am = dict(x=1)
499 ao = Message(am)
523 ao = Message(am)
500 assert ao.x == am['x']
524 assert ao.x == am['x']
501
525
502 am['y'] = dict(z=1)
526 am['y'] = dict(z=1)
503 ao = Message(am)
527 ao = Message(am)
504 assert ao.y.z == am['y']['z']
528 assert ao.y.z == am['y']['z']
505
529
506 k1, k2 = 'y', 'z'
530 k1, k2 = 'y', 'z'
507 assert ao[k1][k2] == am[k1][k2]
531 assert ao[k1][k2] == am[k1][k2]
508
532
509 am2 = dict(ao)
533 am2 = dict(ao)
510 assert am['x'] == am2['x']
534 assert am['x'] == am2['x']
511 assert am['y']['z'] == am2['y']['z']
535 assert am['y']['z'] == am2['y']['z']
General Comments 0
You need to be logged in to leave comments. Login now