##// END OF EJS Templates
newparallel tweaks, fixes...
MinRK -
Show More
@@ -1,1281 +1,1313 b''
1 """A semi-synchronous Client for the ZMQ controller"""
1 """A semi-synchronous Client for the ZMQ controller"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 import os
13 import os
14 import time
14 import time
15 from getpass import getpass
15 from getpass import getpass
16 from pprint import pprint
16 from pprint import pprint
17 from datetime import datetime
17 from datetime import datetime
18 import warnings
18 import json
19 import json
19 pjoin = os.path.join
20 pjoin = os.path.join
20
21
21 import zmq
22 import zmq
22 from zmq.eventloop import ioloop, zmqstream
23 from zmq.eventloop import ioloop, zmqstream
23
24
24 from IPython.utils.path import get_ipython_dir
25 from IPython.utils.path import get_ipython_dir
25 from IPython.external.decorator import decorator
26 from IPython.external.decorator import decorator
26 from IPython.external.ssh import tunnel
27 from IPython.external.ssh import tunnel
27
28
28 import streamsession as ss
29 import streamsession as ss
29 from clusterdir import ClusterDir, ClusterDirError
30 from clusterdir import ClusterDir, ClusterDirError
30 # from remotenamespace import RemoteNamespace
31 # from remotenamespace import RemoteNamespace
31 from view import DirectView, LoadBalancedView
32 from view import DirectView, LoadBalancedView
32 from dependency import Dependency, depend, require
33 from dependency import Dependency, depend, require
33 import error
34 import error
34 import map as Map
35 import map as Map
35 from asyncresult import AsyncResult, AsyncMapResult
36 from asyncresult import AsyncResult, AsyncMapResult
36 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
37 from remotefunction import remote,parallel,ParallelFunction,RemoteFunction
37 from util import ReverseDict, disambiguate_url, validate_url
38 from util import ReverseDict, disambiguate_url, validate_url
38
39
39 #--------------------------------------------------------------------------
40 #--------------------------------------------------------------------------
40 # helpers for implementing old MEC API via client.apply
41 # helpers for implementing old MEC API via client.apply
41 #--------------------------------------------------------------------------
42 #--------------------------------------------------------------------------
42
43
43 def _push(ns):
44 def _push(ns):
44 """helper method for implementing `client.push` via `client.apply`"""
45 """helper method for implementing `client.push` via `client.apply`"""
45 globals().update(ns)
46 globals().update(ns)
46
47
47 def _pull(keys):
48 def _pull(keys):
48 """helper method for implementing `client.pull` via `client.apply`"""
49 """helper method for implementing `client.pull` via `client.apply`"""
49 g = globals()
50 g = globals()
50 if isinstance(keys, (list,tuple, set)):
51 if isinstance(keys, (list,tuple, set)):
51 for key in keys:
52 for key in keys:
52 if not g.has_key(key):
53 if not g.has_key(key):
53 raise NameError("name '%s' is not defined"%key)
54 raise NameError("name '%s' is not defined"%key)
54 return map(g.get, keys)
55 return map(g.get, keys)
55 else:
56 else:
56 if not g.has_key(keys):
57 if not g.has_key(keys):
57 raise NameError("name '%s' is not defined"%keys)
58 raise NameError("name '%s' is not defined"%keys)
58 return g.get(keys)
59 return g.get(keys)
59
60
60 def _clear():
61 def _clear():
61 """helper method for implementing `client.clear` via `client.apply`"""
62 """helper method for implementing `client.clear` via `client.apply`"""
62 globals().clear()
63 globals().clear()
63
64
64 def _execute(code):
65 def _execute(code):
65 """helper method for implementing `client.execute` via `client.apply`"""
66 """helper method for implementing `client.execute` via `client.apply`"""
66 exec code in globals()
67 exec code in globals()
67
68
68
69
69 #--------------------------------------------------------------------------
70 #--------------------------------------------------------------------------
70 # Decorators for Client methods
71 # Decorators for Client methods
71 #--------------------------------------------------------------------------
72 #--------------------------------------------------------------------------
72
73
73 @decorator
74 @decorator
74 def spinfirst(f, self, *args, **kwargs):
75 def spinfirst(f, self, *args, **kwargs):
75 """Call spin() to sync state prior to calling the method."""
76 """Call spin() to sync state prior to calling the method."""
76 self.spin()
77 self.spin()
77 return f(self, *args, **kwargs)
78 return f(self, *args, **kwargs)
78
79
79 @decorator
80 @decorator
80 def defaultblock(f, self, *args, **kwargs):
81 def defaultblock(f, self, *args, **kwargs):
81 """Default to self.block; preserve self.block."""
82 """Default to self.block; preserve self.block."""
82 block = kwargs.get('block',None)
83 block = kwargs.get('block',None)
83 block = self.block if block is None else block
84 block = self.block if block is None else block
84 saveblock = self.block
85 saveblock = self.block
85 self.block = block
86 self.block = block
86 try:
87 try:
87 ret = f(self, *args, **kwargs)
88 ret = f(self, *args, **kwargs)
88 finally:
89 finally:
89 self.block = saveblock
90 self.block = saveblock
90 return ret
91 return ret
91
92
92
93
93 #--------------------------------------------------------------------------
94 #--------------------------------------------------------------------------
94 # Classes
95 # Classes
95 #--------------------------------------------------------------------------
96 #--------------------------------------------------------------------------
96
97
97 class Metadata(dict):
98 class Metadata(dict):
98 """Subclass of dict for initializing metadata values.
99 """Subclass of dict for initializing metadata values.
99
100
100 Attribute access works on keys.
101 Attribute access works on keys.
101
102
102 These objects have a strict set of keys - errors will raise if you try
103 These objects have a strict set of keys - errors will raise if you try
103 to add new keys.
104 to add new keys.
104 """
105 """
105 def __init__(self, *args, **kwargs):
106 def __init__(self, *args, **kwargs):
106 dict.__init__(self)
107 dict.__init__(self)
107 md = {'msg_id' : None,
108 md = {'msg_id' : None,
108 'submitted' : None,
109 'submitted' : None,
109 'started' : None,
110 'started' : None,
110 'completed' : None,
111 'completed' : None,
111 'received' : None,
112 'received' : None,
112 'engine_uuid' : None,
113 'engine_uuid' : None,
113 'engine_id' : None,
114 'engine_id' : None,
114 'follow' : None,
115 'follow' : None,
115 'after' : None,
116 'after' : None,
116 'status' : None,
117 'status' : None,
117
118
118 'pyin' : None,
119 'pyin' : None,
119 'pyout' : None,
120 'pyout' : None,
120 'pyerr' : None,
121 'pyerr' : None,
121 'stdout' : '',
122 'stdout' : '',
122 'stderr' : '',
123 'stderr' : '',
123 }
124 }
124 self.update(md)
125 self.update(md)
125 self.update(dict(*args, **kwargs))
126 self.update(dict(*args, **kwargs))
126
127
127 def __getattr__(self, key):
128 def __getattr__(self, key):
128 """getattr aliased to getitem"""
129 """getattr aliased to getitem"""
129 if key in self.iterkeys():
130 if key in self.iterkeys():
130 return self[key]
131 return self[key]
131 else:
132 else:
132 raise AttributeError(key)
133 raise AttributeError(key)
133
134
134 def __setattr__(self, key, value):
135 def __setattr__(self, key, value):
135 """setattr aliased to setitem, with strict"""
136 """setattr aliased to setitem, with strict"""
136 if key in self.iterkeys():
137 if key in self.iterkeys():
137 self[key] = value
138 self[key] = value
138 else:
139 else:
139 raise AttributeError(key)
140 raise AttributeError(key)
140
141
141 def __setitem__(self, key, value):
142 def __setitem__(self, key, value):
142 """strict static key enforcement"""
143 """strict static key enforcement"""
143 if key in self.iterkeys():
144 if key in self.iterkeys():
144 dict.__setitem__(self, key, value)
145 dict.__setitem__(self, key, value)
145 else:
146 else:
146 raise KeyError(key)
147 raise KeyError(key)
147
148
148
149
149 class Client(object):
150 class Client(object):
150 """A semi-synchronous client to the IPython ZMQ controller
151 """A semi-synchronous client to the IPython ZMQ controller
151
152
152 Parameters
153 Parameters
153 ----------
154 ----------
154
155
155 url_or_file : bytes; zmq url or path to ipcontroller-client.json
156 url_or_file : bytes; zmq url or path to ipcontroller-client.json
156 Connection information for the Hub's registration. If a json connector
157 Connection information for the Hub's registration. If a json connector
157 file is given, then likely no further configuration is necessary.
158 file is given, then likely no further configuration is necessary.
158 [Default: use profile]
159 [Default: use profile]
159 profile : bytes
160 profile : bytes
160 The name of the Cluster profile to be used to find connector information.
161 The name of the Cluster profile to be used to find connector information.
161 [Default: 'default']
162 [Default: 'default']
162 context : zmq.Context
163 context : zmq.Context
163 Pass an existing zmq.Context instance, otherwise the client will create its own.
164 Pass an existing zmq.Context instance, otherwise the client will create its own.
164 username : bytes
165 username : bytes
165 set username to be passed to the Session object
166 set username to be passed to the Session object
166 debug : bool
167 debug : bool
167 flag for lots of message printing for debug purposes
168 flag for lots of message printing for debug purposes
168
169
169 #-------------- ssh related args ----------------
170 #-------------- ssh related args ----------------
170 # These are args for configuring the ssh tunnel to be used
171 # These are args for configuring the ssh tunnel to be used
171 # credentials are used to forward connections over ssh to the Controller
172 # credentials are used to forward connections over ssh to the Controller
172 # Note that the ip given in `addr` needs to be relative to sshserver
173 # Note that the ip given in `addr` needs to be relative to sshserver
173 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
174 # The most basic case is to leave addr as pointing to localhost (127.0.0.1),
174 # and set sshserver as the same machine the Controller is on. However,
175 # and set sshserver as the same machine the Controller is on. However,
175 # the only requirement is that sshserver is able to see the Controller
176 # the only requirement is that sshserver is able to see the Controller
176 # (i.e. is within the same trusted network).
177 # (i.e. is within the same trusted network).
177
178
178 sshserver : str
179 sshserver : str
179 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
180 A string of the form passed to ssh, i.e. 'server.tld' or 'user@server.tld:port'
180 If keyfile or password is specified, and this is not, it will default to
181 If keyfile or password is specified, and this is not, it will default to
181 the ip given in addr.
182 the ip given in addr.
182 sshkey : str; path to public ssh key file
183 sshkey : str; path to public ssh key file
183 This specifies a key to be used in ssh login, default None.
184 This specifies a key to be used in ssh login, default None.
184 Regular default ssh keys will be used without specifying this argument.
185 Regular default ssh keys will be used without specifying this argument.
185 password : str
186 password : str
186 Your ssh password to sshserver. Note that if this is left None,
187 Your ssh password to sshserver. Note that if this is left None,
187 you will be prompted for it if passwordless key based login is unavailable.
188 you will be prompted for it if passwordless key based login is unavailable.
188 paramiko : bool
189 paramiko : bool
189 flag for whether to use paramiko instead of shell ssh for tunneling.
190 flag for whether to use paramiko instead of shell ssh for tunneling.
190 [default: True on win32, False else]
191 [default: True on win32, False else]
191
192
192 #------- exec authentication args -------
193 #------- exec authentication args -------
193 # If even localhost is untrusted, you can have some protection against
194 # If even localhost is untrusted, you can have some protection against
194 # unauthorized execution by using a key. Messages are still sent
195 # unauthorized execution by using a key. Messages are still sent
195 # as cleartext, so if someone can snoop your loopback traffic this will
196 # as cleartext, so if someone can snoop your loopback traffic this will
196 # not help against malicious attacks.
197 # not help against malicious attacks.
197
198
198 exec_key : str
199 exec_key : str
199 an authentication key or file containing a key
200 an authentication key or file containing a key
200 default: None
201 default: None
201
202
202
203
203 Attributes
204 Attributes
204 ----------
205 ----------
205 ids : set of int engine IDs
206 ids : set of int engine IDs
206 requesting the ids attribute always synchronizes
207 requesting the ids attribute always synchronizes
207 the registration state. To request ids without synchronization,
208 the registration state. To request ids without synchronization,
208 use semi-private _ids attributes.
209 use semi-private _ids attributes.
209
210
210 history : list of msg_ids
211 history : list of msg_ids
211 a list of msg_ids, keeping track of all the execution
212 a list of msg_ids, keeping track of all the execution
212 messages you have submitted in order.
213 messages you have submitted in order.
213
214
214 outstanding : set of msg_ids
215 outstanding : set of msg_ids
215 a set of msg_ids that have been submitted, but whose
216 a set of msg_ids that have been submitted, but whose
216 results have not yet been received.
217 results have not yet been received.
217
218
218 results : dict
219 results : dict
219 a dict of all our results, keyed by msg_id
220 a dict of all our results, keyed by msg_id
220
221
221 block : bool
222 block : bool
222 determines default behavior when block not specified
223 determines default behavior when block not specified
223 in execution methods
224 in execution methods
224
225
225 Methods
226 Methods
226 -------
227 -------
227 spin : flushes incoming results and registration state changes
228 spin : flushes incoming results and registration state changes
228 control methods spin, and requesting `ids` also ensures up to date
229 control methods spin, and requesting `ids` also ensures up to date
229
230
230 barrier : wait on one or more msg_ids
231 barrier : wait on one or more msg_ids
231
232
232 execution methods: apply/apply_bound/apply_to/apply_bound
233 execution methods: apply/apply_bound/apply_to/apply_bound
233 legacy: execute, run
234 legacy: execute, run
234
235
235 query methods: queue_status, get_result, purge
236 query methods: queue_status, get_result, purge
236
237
237 control methods: abort, kill
238 control methods: abort, kill
238
239
239 """
240 """
240
241
241
242
242 _connected=False
243 _connected=False
243 _ssh=False
244 _ssh=False
244 _engines=None
245 _engines=None
245 _registration_socket=None
246 _registration_socket=None
246 _query_socket=None
247 _query_socket=None
247 _control_socket=None
248 _control_socket=None
248 _iopub_socket=None
249 _iopub_socket=None
249 _notification_socket=None
250 _notification_socket=None
250 _mux_socket=None
251 _mux_socket=None
251 _task_socket=None
252 _task_socket=None
253 _task_scheme=None
252 block = False
254 block = False
253 outstanding=None
255 outstanding=None
254 results = None
256 results = None
255 history = None
257 history = None
256 debug = False
258 debug = False
257 targets = None
259 targets = None
258
260
259 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
261 def __init__(self, url_or_file=None, profile='default', cluster_dir=None, ipython_dir=None,
260 context=None, username=None, debug=False, exec_key=None,
262 context=None, username=None, debug=False, exec_key=None,
261 sshserver=None, sshkey=None, password=None, paramiko=None,
263 sshserver=None, sshkey=None, password=None, paramiko=None,
262 ):
264 ):
263 if context is None:
265 if context is None:
264 context = zmq.Context()
266 context = zmq.Context()
265 self.context = context
267 self.context = context
266 self.targets = 'all'
268 self.targets = 'all'
267
269
268 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
270 self._setup_cluster_dir(profile, cluster_dir, ipython_dir)
269 if self._cd is not None:
271 if self._cd is not None:
270 if url_or_file is None:
272 if url_or_file is None:
271 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
273 url_or_file = pjoin(self._cd.security_dir, 'ipcontroller-client.json')
272 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
274 assert url_or_file is not None, "I can't find enough information to connect to a controller!"\
273 " Please specify at least one of url_or_file or profile."
275 " Please specify at least one of url_or_file or profile."
274
276
275 try:
277 try:
276 validate_url(url_or_file)
278 validate_url(url_or_file)
277 except AssertionError:
279 except AssertionError:
278 if not os.path.exists(url_or_file):
280 if not os.path.exists(url_or_file):
279 if self._cd:
281 if self._cd:
280 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
282 url_or_file = os.path.join(self._cd.security_dir, url_or_file)
281 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
283 assert os.path.exists(url_or_file), "Not a valid connection file or url: %r"%url_or_file
282 with open(url_or_file) as f:
284 with open(url_or_file) as f:
283 cfg = json.loads(f.read())
285 cfg = json.loads(f.read())
284 else:
286 else:
285 cfg = {'url':url_or_file}
287 cfg = {'url':url_or_file}
286
288
287 # sync defaults from args, json:
289 # sync defaults from args, json:
288 if sshserver:
290 if sshserver:
289 cfg['ssh'] = sshserver
291 cfg['ssh'] = sshserver
290 if exec_key:
292 if exec_key:
291 cfg['exec_key'] = exec_key
293 cfg['exec_key'] = exec_key
292 exec_key = cfg['exec_key']
294 exec_key = cfg['exec_key']
293 sshserver=cfg['ssh']
295 sshserver=cfg['ssh']
294 url = cfg['url']
296 url = cfg['url']
295 location = cfg.setdefault('location', None)
297 location = cfg.setdefault('location', None)
296 cfg['url'] = disambiguate_url(cfg['url'], location)
298 cfg['url'] = disambiguate_url(cfg['url'], location)
297 url = cfg['url']
299 url = cfg['url']
298
300
299 self._config = cfg
301 self._config = cfg
300
302
301
302 self._ssh = bool(sshserver or sshkey or password)
303 self._ssh = bool(sshserver or sshkey or password)
303 if self._ssh and sshserver is None:
304 if self._ssh and sshserver is None:
304 # default to ssh via localhost
305 # default to ssh via localhost
305 sshserver = url.split('://')[1].split(':')[0]
306 sshserver = url.split('://')[1].split(':')[0]
306 if self._ssh and password is None:
307 if self._ssh and password is None:
307 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
308 if tunnel.try_passwordless_ssh(sshserver, sshkey, paramiko):
308 password=False
309 password=False
309 else:
310 else:
310 password = getpass("SSH Password for %s: "%sshserver)
311 password = getpass("SSH Password for %s: "%sshserver)
311 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
312 ssh_kwargs = dict(keyfile=sshkey, password=password, paramiko=paramiko)
312 if exec_key is not None and os.path.isfile(exec_key):
313 if exec_key is not None and os.path.isfile(exec_key):
313 arg = 'keyfile'
314 arg = 'keyfile'
314 else:
315 else:
315 arg = 'key'
316 arg = 'key'
316 key_arg = {arg:exec_key}
317 key_arg = {arg:exec_key}
317 if username is None:
318 if username is None:
318 self.session = ss.StreamSession(**key_arg)
319 self.session = ss.StreamSession(**key_arg)
319 else:
320 else:
320 self.session = ss.StreamSession(username, **key_arg)
321 self.session = ss.StreamSession(username, **key_arg)
321 self._registration_socket = self.context.socket(zmq.XREQ)
322 self._registration_socket = self.context.socket(zmq.XREQ)
322 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
323 self._registration_socket.setsockopt(zmq.IDENTITY, self.session.session)
323 if self._ssh:
324 if self._ssh:
324 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
325 tunnel.tunnel_connection(self._registration_socket, url, sshserver, **ssh_kwargs)
325 else:
326 else:
326 self._registration_socket.connect(url)
327 self._registration_socket.connect(url)
327 self._engines = ReverseDict()
328 self._engines = ReverseDict()
328 self._ids = set()
329 self._ids = set()
329 self.outstanding=set()
330 self.outstanding=set()
330 self.results = {}
331 self.results = {}
331 self.metadata = {}
332 self.metadata = {}
332 self.history = []
333 self.history = []
333 self.debug = debug
334 self.debug = debug
334 self.session.debug = debug
335 self.session.debug = debug
335
336
336 self._notification_handlers = {'registration_notification' : self._register_engine,
337 self._notification_handlers = {'registration_notification' : self._register_engine,
337 'unregistration_notification' : self._unregister_engine,
338 'unregistration_notification' : self._unregister_engine,
338 }
339 }
339 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
340 self._queue_handlers = {'execute_reply' : self._handle_execute_reply,
340 'apply_reply' : self._handle_apply_reply}
341 'apply_reply' : self._handle_apply_reply}
341 self._connect(sshserver, ssh_kwargs)
342 self._connect(sshserver, ssh_kwargs)
342
343
343
344
344 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
345 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir):
345 if ipython_dir is None:
346 if ipython_dir is None:
346 ipython_dir = get_ipython_dir()
347 ipython_dir = get_ipython_dir()
347 if cluster_dir is not None:
348 if cluster_dir is not None:
348 try:
349 try:
349 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
350 self._cd = ClusterDir.find_cluster_dir(cluster_dir)
350 except ClusterDirError:
351 except ClusterDirError:
351 pass
352 pass
352 elif profile is not None:
353 elif profile is not None:
353 try:
354 try:
354 self._cd = ClusterDir.find_cluster_dir_by_profile(
355 self._cd = ClusterDir.find_cluster_dir_by_profile(
355 ipython_dir, profile)
356 ipython_dir, profile)
356 except ClusterDirError:
357 except ClusterDirError:
357 pass
358 pass
358 else:
359 else:
359 self._cd = None
360 self._cd = None
360
361
361 @property
362 @property
362 def ids(self):
363 def ids(self):
363 """Always up to date ids property."""
364 """Always up-to-date ids property."""
364 self._flush_notifications()
365 self._flush_notifications()
365 return self._ids
366 return self._ids
366
367
367 def _update_engines(self, engines):
368 def _update_engines(self, engines):
368 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
369 """Update our engines dict and _ids from a dict of the form: {id:uuid}."""
369 for k,v in engines.iteritems():
370 for k,v in engines.iteritems():
370 eid = int(k)
371 eid = int(k)
371 self._engines[eid] = bytes(v) # force not unicode
372 self._engines[eid] = bytes(v) # force not unicode
372 self._ids.add(eid)
373 self._ids.add(eid)
374 if sorted(self._engines.keys()) != range(len(self._engines)) and \
375 self._task_scheme == 'pure' and self._task_socket:
376 self._stop_scheduling_tasks()
377
378 def _stop_scheduling_tasks(self):
379 """Stop scheduling tasks because an engine has been unregistered
380 from a pure ZMQ scheduler.
381 """
382
383 self._task_socket.close()
384 self._task_socket = None
385 msg = "An engine has been unregistered, and we are using pure " +\
386 "ZMQ task scheduling. Task farming will be disabled."
387 if self.outstanding:
388 msg += " If you were running tasks when this happened, " +\
389 "some `outstanding` msg_ids may never resolve."
390 warnings.warn(msg, RuntimeWarning)
373
391
374 def _build_targets(self, targets):
392 def _build_targets(self, targets):
375 """Turn valid target IDs or 'all' into two lists:
393 """Turn valid target IDs or 'all' into two lists:
376 (int_ids, uuids).
394 (int_ids, uuids).
377 """
395 """
378 if targets is None:
396 if targets is None:
379 targets = self._ids
397 targets = self._ids
380 elif isinstance(targets, str):
398 elif isinstance(targets, str):
381 if targets.lower() == 'all':
399 if targets.lower() == 'all':
382 targets = self._ids
400 targets = self._ids
383 else:
401 else:
384 raise TypeError("%r not valid str target, must be 'all'"%(targets))
402 raise TypeError("%r not valid str target, must be 'all'"%(targets))
385 elif isinstance(targets, int):
403 elif isinstance(targets, int):
386 targets = [targets]
404 targets = [targets]
387 return [self._engines[t] for t in targets], list(targets)
405 return [self._engines[t] for t in targets], list(targets)
388
406
389 def _connect(self, sshserver, ssh_kwargs):
407 def _connect(self, sshserver, ssh_kwargs):
390 """setup all our socket connections to the controller. This is called from
408 """setup all our socket connections to the controller. This is called from
391 __init__."""
409 __init__."""
410
411 # Maybe allow reconnecting?
392 if self._connected:
412 if self._connected:
393 return
413 return
394 self._connected=True
414 self._connected=True
395
415
396 def connect_socket(s, url):
416 def connect_socket(s, url):
397 url = disambiguate_url(url, self._config['location'])
417 url = disambiguate_url(url, self._config['location'])
398 if self._ssh:
418 if self._ssh:
399 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
419 return tunnel.tunnel_connection(s, url, sshserver, **ssh_kwargs)
400 else:
420 else:
401 return s.connect(url)
421 return s.connect(url)
402
422
403 self.session.send(self._registration_socket, 'connection_request')
423 self.session.send(self._registration_socket, 'connection_request')
404 idents,msg = self.session.recv(self._registration_socket,mode=0)
424 idents,msg = self.session.recv(self._registration_socket,mode=0)
405 if self.debug:
425 if self.debug:
406 pprint(msg)
426 pprint(msg)
407 msg = ss.Message(msg)
427 msg = ss.Message(msg)
408 content = msg.content
428 content = msg.content
429 self._config['registration'] = dict(content)
409 if content.status == 'ok':
430 if content.status == 'ok':
410 if content.mux:
431 if content.mux:
411 self._mux_socket = self.context.socket(zmq.PAIR)
432 self._mux_socket = self.context.socket(zmq.PAIR)
412 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
433 self._mux_socket.setsockopt(zmq.IDENTITY, self.session.session)
413 connect_socket(self._mux_socket, content.mux)
434 connect_socket(self._mux_socket, content.mux)
414 if content.task:
435 if content.task:
436 self._task_scheme, task_addr = content.task
415 self._task_socket = self.context.socket(zmq.PAIR)
437 self._task_socket = self.context.socket(zmq.PAIR)
416 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
438 self._task_socket.setsockopt(zmq.IDENTITY, self.session.session)
417 connect_socket(self._task_socket, content.task)
439 connect_socket(self._task_socket, task_addr)
418 if content.notification:
440 if content.notification:
419 self._notification_socket = self.context.socket(zmq.SUB)
441 self._notification_socket = self.context.socket(zmq.SUB)
420 connect_socket(self._notification_socket, content.notification)
442 connect_socket(self._notification_socket, content.notification)
421 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
443 self._notification_socket.setsockopt(zmq.SUBSCRIBE, "")
422 if content.query:
444 if content.query:
423 self._query_socket = self.context.socket(zmq.PAIR)
445 self._query_socket = self.context.socket(zmq.PAIR)
424 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
446 self._query_socket.setsockopt(zmq.IDENTITY, self.session.session)
425 connect_socket(self._query_socket, content.query)
447 connect_socket(self._query_socket, content.query)
426 if content.control:
448 if content.control:
427 self._control_socket = self.context.socket(zmq.PAIR)
449 self._control_socket = self.context.socket(zmq.PAIR)
428 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
450 self._control_socket.setsockopt(zmq.IDENTITY, self.session.session)
429 connect_socket(self._control_socket, content.control)
451 connect_socket(self._control_socket, content.control)
430 if content.iopub:
452 if content.iopub:
431 self._iopub_socket = self.context.socket(zmq.SUB)
453 self._iopub_socket = self.context.socket(zmq.SUB)
432 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
454 self._iopub_socket.setsockopt(zmq.SUBSCRIBE, '')
433 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
455 self._iopub_socket.setsockopt(zmq.IDENTITY, self.session.session)
434 connect_socket(self._iopub_socket, content.iopub)
456 connect_socket(self._iopub_socket, content.iopub)
435 self._update_engines(dict(content.engines))
457 self._update_engines(dict(content.engines))
436
458
437 else:
459 else:
438 self._connected = False
460 self._connected = False
439 raise Exception("Failed to connect!")
461 raise Exception("Failed to connect!")
440
462
441 #--------------------------------------------------------------------------
463 #--------------------------------------------------------------------------
442 # handlers and callbacks for incoming messages
464 # handlers and callbacks for incoming messages
443 #--------------------------------------------------------------------------
465 #--------------------------------------------------------------------------
444
466
445 def _register_engine(self, msg):
467 def _register_engine(self, msg):
446 """Register a new engine, and update our connection info."""
468 """Register a new engine, and update our connection info."""
447 content = msg['content']
469 content = msg['content']
448 eid = content['id']
470 eid = content['id']
449 d = {eid : content['queue']}
471 d = {eid : content['queue']}
450 self._update_engines(d)
472 self._update_engines(d)
451 self._ids.add(int(eid))
473 self._ids.add(int(eid))
452
474
453 def _unregister_engine(self, msg):
475 def _unregister_engine(self, msg):
454 """Unregister an engine that has died."""
476 """Unregister an engine that has died."""
455 content = msg['content']
477 content = msg['content']
456 eid = int(content['id'])
478 eid = int(content['id'])
457 if eid in self._ids:
479 if eid in self._ids:
458 self._ids.remove(eid)
480 self._ids.remove(eid)
459 self._engines.pop(eid)
481 self._engines.pop(eid)
482 if self._task_socket and self._task_scheme == 'pure':
483 self._stop_scheduling_tasks()
460
484
461 def _extract_metadata(self, header, parent, content):
485 def _extract_metadata(self, header, parent, content):
462 md = {'msg_id' : parent['msg_id'],
486 md = {'msg_id' : parent['msg_id'],
463 'received' : datetime.now(),
487 'received' : datetime.now(),
464 'engine_uuid' : header.get('engine', None),
488 'engine_uuid' : header.get('engine', None),
465 'follow' : parent.get('follow', []),
489 'follow' : parent.get('follow', []),
466 'after' : parent.get('after', []),
490 'after' : parent.get('after', []),
467 'status' : content['status'],
491 'status' : content['status'],
468 }
492 }
469
493
470 if md['engine_uuid'] is not None:
494 if md['engine_uuid'] is not None:
471 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
495 md['engine_id'] = self._engines.get(md['engine_uuid'], None)
472
496
473 if 'date' in parent:
497 if 'date' in parent:
474 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
498 md['submitted'] = datetime.strptime(parent['date'], ss.ISO8601)
475 if 'started' in header:
499 if 'started' in header:
476 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
500 md['started'] = datetime.strptime(header['started'], ss.ISO8601)
477 if 'date' in header:
501 if 'date' in header:
478 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
502 md['completed'] = datetime.strptime(header['date'], ss.ISO8601)
479 return md
503 return md
480
504
481 def _handle_execute_reply(self, msg):
505 def _handle_execute_reply(self, msg):
482 """Save the reply to an execute_request into our results.
506 """Save the reply to an execute_request into our results.
483
507
484 execute messages are never actually used. apply is used instead.
508 execute messages are never actually used. apply is used instead.
485 """
509 """
486
510
487 parent = msg['parent_header']
511 parent = msg['parent_header']
488 msg_id = parent['msg_id']
512 msg_id = parent['msg_id']
489 if msg_id not in self.outstanding:
513 if msg_id not in self.outstanding:
490 if msg_id in self.history:
514 if msg_id in self.history:
491 print ("got stale result: %s"%msg_id)
515 print ("got stale result: %s"%msg_id)
492 else:
516 else:
493 print ("got unknown result: %s"%msg_id)
517 print ("got unknown result: %s"%msg_id)
494 else:
518 else:
495 self.outstanding.remove(msg_id)
519 self.outstanding.remove(msg_id)
496 self.results[msg_id] = ss.unwrap_exception(msg['content'])
520 self.results[msg_id] = ss.unwrap_exception(msg['content'])
497
521
498 def _handle_apply_reply(self, msg):
522 def _handle_apply_reply(self, msg):
499 """Save the reply to an apply_request into our results."""
523 """Save the reply to an apply_request into our results."""
500 parent = msg['parent_header']
524 parent = msg['parent_header']
501 msg_id = parent['msg_id']
525 msg_id = parent['msg_id']
502 if msg_id not in self.outstanding:
526 if msg_id not in self.outstanding:
503 if msg_id in self.history:
527 if msg_id in self.history:
504 print ("got stale result: %s"%msg_id)
528 print ("got stale result: %s"%msg_id)
505 print self.results[msg_id]
529 print self.results[msg_id]
506 print msg
530 print msg
507 else:
531 else:
508 print ("got unknown result: %s"%msg_id)
532 print ("got unknown result: %s"%msg_id)
509 else:
533 else:
510 self.outstanding.remove(msg_id)
534 self.outstanding.remove(msg_id)
511 content = msg['content']
535 content = msg['content']
512 header = msg['header']
536 header = msg['header']
513
537
514 # construct metadata:
538 # construct metadata:
515 md = self.metadata.setdefault(msg_id, Metadata())
539 md = self.metadata.setdefault(msg_id, Metadata())
516 md.update(self._extract_metadata(header, parent, content))
540 md.update(self._extract_metadata(header, parent, content))
517 self.metadata[msg_id] = md
541 self.metadata[msg_id] = md
518
542
519 # construct result:
543 # construct result:
520 if content['status'] == 'ok':
544 if content['status'] == 'ok':
521 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
545 self.results[msg_id] = ss.unserialize_object(msg['buffers'])[0]
522 elif content['status'] == 'aborted':
546 elif content['status'] == 'aborted':
523 self.results[msg_id] = error.AbortedTask(msg_id)
547 self.results[msg_id] = error.AbortedTask(msg_id)
524 elif content['status'] == 'resubmitted':
548 elif content['status'] == 'resubmitted':
525 # TODO: handle resubmission
549 # TODO: handle resubmission
526 pass
550 pass
527 else:
551 else:
528 e = ss.unwrap_exception(content)
552 e = ss.unwrap_exception(content)
529 if e.engine_info:
553 if e.engine_info:
530 e_uuid = e.engine_info['engineid']
554 e_uuid = e.engine_info['engineid']
531 eid = self._engines[e_uuid]
555 eid = self._engines[e_uuid]
532 e.engine_info['engineid'] = eid
556 e.engine_info['engineid'] = eid
533 self.results[msg_id] = e
557 self.results[msg_id] = e
534
558
535 def _flush_notifications(self):
559 def _flush_notifications(self):
536 """Flush notifications of engine registrations waiting
560 """Flush notifications of engine registrations waiting
537 in ZMQ queue."""
561 in ZMQ queue."""
538 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
562 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
539 while msg is not None:
563 while msg is not None:
540 if self.debug:
564 if self.debug:
541 pprint(msg)
565 pprint(msg)
542 msg = msg[-1]
566 msg = msg[-1]
543 msg_type = msg['msg_type']
567 msg_type = msg['msg_type']
544 handler = self._notification_handlers.get(msg_type, None)
568 handler = self._notification_handlers.get(msg_type, None)
545 if handler is None:
569 if handler is None:
546 raise Exception("Unhandled message type: %s"%msg.msg_type)
570 raise Exception("Unhandled message type: %s"%msg.msg_type)
547 else:
571 else:
548 handler(msg)
572 handler(msg)
549 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
573 msg = self.session.recv(self._notification_socket, mode=zmq.NOBLOCK)
550
574
551 def _flush_results(self, sock):
575 def _flush_results(self, sock):
552 """Flush task or queue results waiting in ZMQ queue."""
576 """Flush task or queue results waiting in ZMQ queue."""
553 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
577 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
554 while msg is not None:
578 while msg is not None:
555 if self.debug:
579 if self.debug:
556 pprint(msg)
580 pprint(msg)
557 msg = msg[-1]
581 msg = msg[-1]
558 msg_type = msg['msg_type']
582 msg_type = msg['msg_type']
559 handler = self._queue_handlers.get(msg_type, None)
583 handler = self._queue_handlers.get(msg_type, None)
560 if handler is None:
584 if handler is None:
561 raise Exception("Unhandled message type: %s"%msg.msg_type)
585 raise Exception("Unhandled message type: %s"%msg.msg_type)
562 else:
586 else:
563 handler(msg)
587 handler(msg)
564 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
588 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
565
589
566 def _flush_control(self, sock):
590 def _flush_control(self, sock):
567 """Flush replies from the control channel waiting
591 """Flush replies from the control channel waiting
568 in the ZMQ queue.
592 in the ZMQ queue.
569
593
570 Currently: ignore them."""
594 Currently: ignore them."""
571 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
595 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
572 while msg is not None:
596 while msg is not None:
573 if self.debug:
597 if self.debug:
574 pprint(msg)
598 pprint(msg)
575 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
599 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
576
600
577 def _flush_iopub(self, sock):
601 def _flush_iopub(self, sock):
578 """Flush replies from the iopub channel waiting
602 """Flush replies from the iopub channel waiting
579 in the ZMQ queue.
603 in the ZMQ queue.
580 """
604 """
581 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
605 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
582 while msg is not None:
606 while msg is not None:
583 if self.debug:
607 if self.debug:
584 pprint(msg)
608 pprint(msg)
585 msg = msg[-1]
609 msg = msg[-1]
586 parent = msg['parent_header']
610 parent = msg['parent_header']
587 msg_id = parent['msg_id']
611 msg_id = parent['msg_id']
588 content = msg['content']
612 content = msg['content']
589 header = msg['header']
613 header = msg['header']
590 msg_type = msg['msg_type']
614 msg_type = msg['msg_type']
591
615
592 # init metadata:
616 # init metadata:
593 md = self.metadata.setdefault(msg_id, Metadata())
617 md = self.metadata.setdefault(msg_id, Metadata())
594
618
595 if msg_type == 'stream':
619 if msg_type == 'stream':
596 name = content['name']
620 name = content['name']
597 s = md[name] or ''
621 s = md[name] or ''
598 md[name] = s + content['data']
622 md[name] = s + content['data']
599 elif msg_type == 'pyerr':
623 elif msg_type == 'pyerr':
600 md.update({'pyerr' : ss.unwrap_exception(content)})
624 md.update({'pyerr' : ss.unwrap_exception(content)})
601 else:
625 else:
602 md.update({msg_type : content['data']})
626 md.update({msg_type : content['data']})
603
627
604 self.metadata[msg_id] = md
628 self.metadata[msg_id] = md
605
629
606 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
630 msg = self.session.recv(sock, mode=zmq.NOBLOCK)
607
631
608 #--------------------------------------------------------------------------
632 #--------------------------------------------------------------------------
609 # getitem
633 # getitem
610 #--------------------------------------------------------------------------
634 #--------------------------------------------------------------------------
611
635
612 def __getitem__(self, key):
636 def __getitem__(self, key):
613 """Dict access returns DirectView multiplexer objects or,
637 """Dict access returns DirectView multiplexer objects or,
614 if key is None, a LoadBalancedView."""
638 if key is None, a LoadBalancedView."""
615 if key is None:
639 if key is None:
616 return LoadBalancedView(self)
640 return LoadBalancedView(self)
617 if isinstance(key, int):
641 if isinstance(key, int):
618 if key not in self.ids:
642 if key not in self.ids:
619 raise IndexError("No such engine: %i"%key)
643 raise IndexError("No such engine: %i"%key)
620 return DirectView(self, key)
644 return DirectView(self, key)
621
645
622 if isinstance(key, slice):
646 if isinstance(key, slice):
623 indices = range(len(self.ids))[key]
647 indices = range(len(self.ids))[key]
624 ids = sorted(self._ids)
648 ids = sorted(self._ids)
625 key = [ ids[i] for i in indices ]
649 key = [ ids[i] for i in indices ]
626 # newkeys = sorted(self._ids)[thekeys[k]]
650 # newkeys = sorted(self._ids)[thekeys[k]]
627
651
628 if isinstance(key, (tuple, list, xrange)):
652 if isinstance(key, (tuple, list, xrange)):
629 _,targets = self._build_targets(list(key))
653 _,targets = self._build_targets(list(key))
630 return DirectView(self, targets)
654 return DirectView(self, targets)
631 else:
655 else:
632 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
656 raise TypeError("key by int/iterable of ints only, not %s"%(type(key)))
633
657
634 #--------------------------------------------------------------------------
658 #--------------------------------------------------------------------------
635 # Begin public methods
659 # Begin public methods
636 #--------------------------------------------------------------------------
660 #--------------------------------------------------------------------------
637
661
638 @property
662 @property
639 def remote(self):
663 def remote(self):
640 """property for convenient RemoteFunction generation.
664 """property for convenient RemoteFunction generation.
641
665
642 >>> @client.remote
666 >>> @client.remote
643 ... def f():
667 ... def f():
644 import os
668 import os
645 print (os.getpid())
669 print (os.getpid())
646 """
670 """
647 return remote(self, block=self.block)
671 return remote(self, block=self.block)
648
672
649 def spin(self):
673 def spin(self):
650 """Flush any registration notifications and execution results
674 """Flush any registration notifications and execution results
651 waiting in the ZMQ queue.
675 waiting in the ZMQ queue.
652 """
676 """
653 if self._notification_socket:
677 if self._notification_socket:
654 self._flush_notifications()
678 self._flush_notifications()
655 if self._mux_socket:
679 if self._mux_socket:
656 self._flush_results(self._mux_socket)
680 self._flush_results(self._mux_socket)
657 if self._task_socket:
681 if self._task_socket:
658 self._flush_results(self._task_socket)
682 self._flush_results(self._task_socket)
659 if self._control_socket:
683 if self._control_socket:
660 self._flush_control(self._control_socket)
684 self._flush_control(self._control_socket)
661 if self._iopub_socket:
685 if self._iopub_socket:
662 self._flush_iopub(self._iopub_socket)
686 self._flush_iopub(self._iopub_socket)
663
687
664 def barrier(self, msg_ids=None, timeout=-1):
688 def barrier(self, msg_ids=None, timeout=-1):
665 """waits on one or more `msg_ids`, for up to `timeout` seconds.
689 """waits on one or more `msg_ids`, for up to `timeout` seconds.
666
690
667 Parameters
691 Parameters
668 ----------
692 ----------
669 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
693 msg_ids : int, str, or list of ints and/or strs, or one or more AsyncResult objects
670 ints are indices to self.history
694 ints are indices to self.history
671 strs are msg_ids
695 strs are msg_ids
672 default: wait on all outstanding messages
696 default: wait on all outstanding messages
673 timeout : float
697 timeout : float
674 a time in seconds, after which to give up.
698 a time in seconds, after which to give up.
675 default is -1, which means no timeout
699 default is -1, which means no timeout
676
700
677 Returns
701 Returns
678 -------
702 -------
679 True : when all msg_ids are done
703 True : when all msg_ids are done
680 False : timeout reached, some msg_ids still outstanding
704 False : timeout reached, some msg_ids still outstanding
681 """
705 """
682 tic = time.time()
706 tic = time.time()
683 if msg_ids is None:
707 if msg_ids is None:
684 theids = self.outstanding
708 theids = self.outstanding
685 else:
709 else:
686 if isinstance(msg_ids, (int, str, AsyncResult)):
710 if isinstance(msg_ids, (int, str, AsyncResult)):
687 msg_ids = [msg_ids]
711 msg_ids = [msg_ids]
688 theids = set()
712 theids = set()
689 for msg_id in msg_ids:
713 for msg_id in msg_ids:
690 if isinstance(msg_id, int):
714 if isinstance(msg_id, int):
691 msg_id = self.history[msg_id]
715 msg_id = self.history[msg_id]
692 elif isinstance(msg_id, AsyncResult):
716 elif isinstance(msg_id, AsyncResult):
693 map(theids.add, msg_id.msg_ids)
717 map(theids.add, msg_id.msg_ids)
694 continue
718 continue
695 theids.add(msg_id)
719 theids.add(msg_id)
696 if not theids.intersection(self.outstanding):
720 if not theids.intersection(self.outstanding):
697 return True
721 return True
698 self.spin()
722 self.spin()
699 while theids.intersection(self.outstanding):
723 while theids.intersection(self.outstanding):
700 if timeout >= 0 and ( time.time()-tic ) > timeout:
724 if timeout >= 0 and ( time.time()-tic ) > timeout:
701 break
725 break
702 time.sleep(1e-3)
726 time.sleep(1e-3)
703 self.spin()
727 self.spin()
704 return len(theids.intersection(self.outstanding)) == 0
728 return len(theids.intersection(self.outstanding)) == 0
705
729
706 #--------------------------------------------------------------------------
730 #--------------------------------------------------------------------------
707 # Control methods
731 # Control methods
708 #--------------------------------------------------------------------------
732 #--------------------------------------------------------------------------
709
733
710 @spinfirst
734 @spinfirst
711 @defaultblock
735 @defaultblock
712 def clear(self, targets=None, block=None):
736 def clear(self, targets=None, block=None):
713 """Clear the namespace in target(s)."""
737 """Clear the namespace in target(s)."""
714 targets = self._build_targets(targets)[0]
738 targets = self._build_targets(targets)[0]
715 for t in targets:
739 for t in targets:
716 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
740 self.session.send(self._control_socket, 'clear_request', content={}, ident=t)
717 error = False
741 error = False
718 if self.block:
742 if self.block:
719 for i in range(len(targets)):
743 for i in range(len(targets)):
720 idents,msg = self.session.recv(self._control_socket,0)
744 idents,msg = self.session.recv(self._control_socket,0)
721 if self.debug:
745 if self.debug:
722 pprint(msg)
746 pprint(msg)
723 if msg['content']['status'] != 'ok':
747 if msg['content']['status'] != 'ok':
724 error = ss.unwrap_exception(msg['content'])
748 error = ss.unwrap_exception(msg['content'])
725 if error:
749 if error:
726 return error
750 return error
727
751
728
752
729 @spinfirst
753 @spinfirst
730 @defaultblock
754 @defaultblock
731 def abort(self, msg_ids = None, targets=None, block=None):
755 def abort(self, msg_ids = None, targets=None, block=None):
732 """Abort the execution queues of target(s)."""
756 """Abort the execution queues of target(s)."""
733 targets = self._build_targets(targets)[0]
757 targets = self._build_targets(targets)[0]
734 if isinstance(msg_ids, basestring):
758 if isinstance(msg_ids, basestring):
735 msg_ids = [msg_ids]
759 msg_ids = [msg_ids]
736 content = dict(msg_ids=msg_ids)
760 content = dict(msg_ids=msg_ids)
737 for t in targets:
761 for t in targets:
738 self.session.send(self._control_socket, 'abort_request',
762 self.session.send(self._control_socket, 'abort_request',
739 content=content, ident=t)
763 content=content, ident=t)
740 error = False
764 error = False
741 if self.block:
765 if self.block:
742 for i in range(len(targets)):
766 for i in range(len(targets)):
743 idents,msg = self.session.recv(self._control_socket,0)
767 idents,msg = self.session.recv(self._control_socket,0)
744 if self.debug:
768 if self.debug:
745 pprint(msg)
769 pprint(msg)
746 if msg['content']['status'] != 'ok':
770 if msg['content']['status'] != 'ok':
747 error = ss.unwrap_exception(msg['content'])
771 error = ss.unwrap_exception(msg['content'])
748 if error:
772 if error:
749 return error
773 return error
750
774
751 @spinfirst
775 @spinfirst
752 @defaultblock
776 @defaultblock
753 def shutdown(self, targets=None, restart=False, controller=False, block=None):
777 def shutdown(self, targets=None, restart=False, controller=False, block=None):
754 """Terminates one or more engine processes, optionally including the controller."""
778 """Terminates one or more engine processes, optionally including the controller."""
755 if controller:
779 if controller:
756 targets = 'all'
780 targets = 'all'
757 targets = self._build_targets(targets)[0]
781 targets = self._build_targets(targets)[0]
758 for t in targets:
782 for t in targets:
759 self.session.send(self._control_socket, 'shutdown_request',
783 self.session.send(self._control_socket, 'shutdown_request',
760 content={'restart':restart},ident=t)
784 content={'restart':restart},ident=t)
761 error = False
785 error = False
762 if block or controller:
786 if block or controller:
763 for i in range(len(targets)):
787 for i in range(len(targets)):
764 idents,msg = self.session.recv(self._control_socket,0)
788 idents,msg = self.session.recv(self._control_socket,0)
765 if self.debug:
789 if self.debug:
766 pprint(msg)
790 pprint(msg)
767 if msg['content']['status'] != 'ok':
791 if msg['content']['status'] != 'ok':
768 error = ss.unwrap_exception(msg['content'])
792 error = ss.unwrap_exception(msg['content'])
769
793
770 if controller:
794 if controller:
771 time.sleep(0.25)
795 time.sleep(0.25)
772 self.session.send(self._query_socket, 'shutdown_request')
796 self.session.send(self._query_socket, 'shutdown_request')
773 idents,msg = self.session.recv(self._query_socket, 0)
797 idents,msg = self.session.recv(self._query_socket, 0)
774 if self.debug:
798 if self.debug:
775 pprint(msg)
799 pprint(msg)
776 if msg['content']['status'] != 'ok':
800 if msg['content']['status'] != 'ok':
777 error = ss.unwrap_exception(msg['content'])
801 error = ss.unwrap_exception(msg['content'])
778
802
779 if error:
803 if error:
780 raise error
804 raise error
781
805
782 #--------------------------------------------------------------------------
806 #--------------------------------------------------------------------------
783 # Execution methods
807 # Execution methods
784 #--------------------------------------------------------------------------
808 #--------------------------------------------------------------------------
785
809
786 @defaultblock
810 @defaultblock
787 def execute(self, code, targets='all', block=None):
811 def execute(self, code, targets='all', block=None):
788 """Executes `code` on `targets` in blocking or nonblocking manner.
812 """Executes `code` on `targets` in blocking or nonblocking manner.
789
813
790 ``execute`` is always `bound` (affects engine namespace)
814 ``execute`` is always `bound` (affects engine namespace)
791
815
792 Parameters
816 Parameters
793 ----------
817 ----------
794 code : str
818 code : str
795 the code string to be executed
819 the code string to be executed
796 targets : int/str/list of ints/strs
820 targets : int/str/list of ints/strs
797 the engines on which to execute
821 the engines on which to execute
798 default : all
822 default : all
799 block : bool
823 block : bool
800 whether or not to wait until done to return
824 whether or not to wait until done to return
801 default: self.block
825 default: self.block
802 """
826 """
803 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
827 result = self.apply(_execute, (code,), targets=targets, block=self.block, bound=True)
804 return result
828 return result
805
829
806 def run(self, filename, targets='all', block=None):
830 def run(self, filename, targets='all', block=None):
807 """Execute contents of `filename` on engine(s).
831 """Execute contents of `filename` on engine(s).
808
832
809 This simply reads the contents of the file and calls `execute`.
833 This simply reads the contents of the file and calls `execute`.
810
834
811 Parameters
835 Parameters
812 ----------
836 ----------
813 filename : str
837 filename : str
814 The path to the file
838 The path to the file
815 targets : int/str/list of ints/strs
839 targets : int/str/list of ints/strs
816 the engines on which to execute
840 the engines on which to execute
817 default : all
841 default : all
818 block : bool
842 block : bool
819 whether or not to wait until done
843 whether or not to wait until done
820 default: self.block
844 default: self.block
821
845
822 """
846 """
823 with open(filename, 'rb') as f:
847 with open(filename, 'rb') as f:
824 code = f.read()
848 code = f.read()
825 return self.execute(code, targets=targets, block=block)
849 return self.execute(code, targets=targets, block=block)
826
850
827 def _maybe_raise(self, result):
851 def _maybe_raise(self, result):
828 """wrapper for maybe raising an exception if apply failed."""
852 """wrapper for maybe raising an exception if apply failed."""
829 if isinstance(result, error.RemoteError):
853 if isinstance(result, error.RemoteError):
830 raise result
854 raise result
831
855
832 return result
856 return result
833
857
834 def _build_dependency(self, dep):
858 def _build_dependency(self, dep):
835 """helper for building jsonable dependencies from various input forms"""
859 """helper for building jsonable dependencies from various input forms"""
836 if isinstance(dep, Dependency):
860 if isinstance(dep, Dependency):
837 return dep.as_dict()
861 return dep.as_dict()
838 elif isinstance(dep, AsyncResult):
862 elif isinstance(dep, AsyncResult):
839 return dep.msg_ids
863 return dep.msg_ids
840 elif dep is None:
864 elif dep is None:
841 return []
865 return []
842 elif isinstance(dep, set):
866 elif isinstance(dep, set):
843 return list(dep)
867 return list(dep)
844 elif isinstance(dep, (list,dict)):
868 elif isinstance(dep, (list,dict)):
845 return dep
869 return dep
846 elif isinstance(dep, str):
870 elif isinstance(dep, str):
847 return [dep]
871 return [dep]
848 else:
872 else:
849 raise TypeError("Dependency may be: set,list,dict,Dependency or AsyncResult, not %r"%type(dep))
873 raise TypeError("Dependency may be: set,list,dict,Dependency or AsyncResult, not %r"%type(dep))
850
874
851 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
875 def apply(self, f, args=None, kwargs=None, bound=True, block=None, targets=None,
852 after=None, follow=None, timeout=None):
876 after=None, follow=None, timeout=None):
853 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
877 """Call `f(*args, **kwargs)` on a remote engine(s), returning the result.
854
878
855 This is the central execution command for the client.
879 This is the central execution command for the client.
856
880
857 Parameters
881 Parameters
858 ----------
882 ----------
859
883
860 f : function
884 f : function
861 The fuction to be called remotely
885 The fuction to be called remotely
862 args : tuple/list
886 args : tuple/list
863 The positional arguments passed to `f`
887 The positional arguments passed to `f`
864 kwargs : dict
888 kwargs : dict
865 The keyword arguments passed to `f`
889 The keyword arguments passed to `f`
866 bound : bool (default: True)
890 bound : bool (default: True)
867 Whether to execute in the Engine(s) namespace, or in a clean
891 Whether to execute in the Engine(s) namespace, or in a clean
868 namespace not affecting the engine.
892 namespace not affecting the engine.
869 block : bool (default: self.block)
893 block : bool (default: self.block)
870 Whether to wait for the result, or return immediately.
894 Whether to wait for the result, or return immediately.
871 False:
895 False:
872 returns msg_id(s)
896 returns msg_id(s)
873 if multiple targets:
897 if multiple targets:
874 list of ids
898 list of ids
875 True:
899 True:
876 returns actual result(s) of f(*args, **kwargs)
900 returns actual result(s) of f(*args, **kwargs)
877 if multiple targets:
901 if multiple targets:
878 dict of results, by engine ID
902 dict of results, by engine ID
879 targets : int,list of ints, 'all', None
903 targets : int,list of ints, 'all', None
880 Specify the destination of the job.
904 Specify the destination of the job.
881 if None:
905 if None:
882 Submit via Task queue for load-balancing.
906 Submit via Task queue for load-balancing.
883 if 'all':
907 if 'all':
884 Run on all active engines
908 Run on all active engines
885 if list:
909 if list:
886 Run on each specified engine
910 Run on each specified engine
887 if int:
911 if int:
888 Run on single engine
912 Run on single engine
889
913
890 after : Dependency or collection of msg_ids
914 after : Dependency or collection of msg_ids
891 Only for load-balanced execution (targets=None)
915 Only for load-balanced execution (targets=None)
892 Specify a list of msg_ids as a time-based dependency.
916 Specify a list of msg_ids as a time-based dependency.
893 This job will only be run *after* the dependencies
917 This job will only be run *after* the dependencies
894 have been met.
918 have been met.
895
919
896 follow : Dependency or collection of msg_ids
920 follow : Dependency or collection of msg_ids
897 Only for load-balanced execution (targets=None)
921 Only for load-balanced execution (targets=None)
898 Specify a list of msg_ids as a location-based dependency.
922 Specify a list of msg_ids as a location-based dependency.
899 This job will only be run on an engine where this dependency
923 This job will only be run on an engine where this dependency
900 is met.
924 is met.
901
925
902 timeout : float or None
926 timeout : float or None
903 Only for load-balanced execution (targets=None)
927 Only for load-balanced execution (targets=None)
904 Specify an amount of time (in seconds)
928 Specify an amount of time (in seconds)
905
929
906 Returns
930 Returns
907 -------
931 -------
908 if block is False:
932 if block is False:
909 if single target:
933 if single target:
910 return msg_id
934 return msg_id
911 else:
935 else:
912 return list of msg_ids
936 return list of msg_ids
913 ? (should this be dict like block=True) ?
937 ? (should this be dict like block=True) ?
914 else:
938 else:
915 if single target:
939 if single target:
916 return result of f(*args, **kwargs)
940 return result of f(*args, **kwargs)
917 else:
941 else:
918 return dict of results, keyed by engine
942 return dict of results, keyed by engine
919 """
943 """
920
944
921 # defaults:
945 # defaults:
922 block = block if block is not None else self.block
946 block = block if block is not None else self.block
923 args = args if args is not None else []
947 args = args if args is not None else []
924 kwargs = kwargs if kwargs is not None else {}
948 kwargs = kwargs if kwargs is not None else {}
925
949
926 # enforce types of f,args,kwrags
950 # enforce types of f,args,kwrags
927 if not callable(f):
951 if not callable(f):
928 raise TypeError("f must be callable, not %s"%type(f))
952 raise TypeError("f must be callable, not %s"%type(f))
929 if not isinstance(args, (tuple, list)):
953 if not isinstance(args, (tuple, list)):
930 raise TypeError("args must be tuple or list, not %s"%type(args))
954 raise TypeError("args must be tuple or list, not %s"%type(args))
931 if not isinstance(kwargs, dict):
955 if not isinstance(kwargs, dict):
932 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
956 raise TypeError("kwargs must be dict, not %s"%type(kwargs))
933
957
934 after = self._build_dependency(after)
958 after = self._build_dependency(after)
935 follow = self._build_dependency(follow)
959 follow = self._build_dependency(follow)
936
960
937 options = dict(bound=bound, block=block)
961 options = dict(bound=bound, block=block)
938
962
939 if targets is None:
963 if targets is None:
940 return self._apply_balanced(f, args, kwargs, timeout=timeout,
964 if self._task_socket:
965 return self._apply_balanced(f, args, kwargs, timeout=timeout,
941 after=after, follow=follow, **options)
966 after=after, follow=follow, **options)
967 else:
968 msg = "Task farming is disabled"
969 if self._task_scheme == 'pure':
970 msg += " because the pure ZMQ scheduler cannot handle"
971 msg += " disappearing engines."
972 raise RuntimeError(msg)
942 else:
973 else:
943 return self._apply_direct(f, args, kwargs, targets=targets, **options)
974 return self._apply_direct(f, args, kwargs, targets=targets, **options)
944
975
945 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
976 def _apply_balanced(self, f, args, kwargs, bound=True, block=None,
946 after=None, follow=None, timeout=None):
977 after=None, follow=None, timeout=None):
947 """The underlying method for applying functions in a load balanced
978 """The underlying method for applying functions in a load balanced
948 manner, via the task queue."""
979 manner, via the task queue."""
949 subheader = dict(after=after, follow=follow, timeout=timeout)
980 subheader = dict(after=after, follow=follow, timeout=timeout)
950 bufs = ss.pack_apply_message(f,args,kwargs)
981 bufs = ss.pack_apply_message(f,args,kwargs)
951 content = dict(bound=bound)
982 content = dict(bound=bound)
952
983
953 msg = self.session.send(self._task_socket, "apply_request",
984 msg = self.session.send(self._task_socket, "apply_request",
954 content=content, buffers=bufs, subheader=subheader)
985 content=content, buffers=bufs, subheader=subheader)
955 msg_id = msg['msg_id']
986 msg_id = msg['msg_id']
956 self.outstanding.add(msg_id)
987 self.outstanding.add(msg_id)
957 self.history.append(msg_id)
988 self.history.append(msg_id)
958 ar = AsyncResult(self, [msg_id], fname=f.__name__)
989 ar = AsyncResult(self, [msg_id], fname=f.__name__)
959 if block:
990 if block:
960 return ar.get()
991 return ar.get()
961 else:
992 else:
962 return ar
993 return ar
963
994
964 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
995 def _apply_direct(self, f, args, kwargs, bound=True, block=None, targets=None):
965 """Then underlying method for applying functions to specific engines
996 """Then underlying method for applying functions to specific engines
966 via the MUX queue."""
997 via the MUX queue."""
967
998
968 queues,targets = self._build_targets(targets)
999 queues,targets = self._build_targets(targets)
969
1000
970 subheader = {}
1001 subheader = {}
971 content = dict(bound=bound)
1002 content = dict(bound=bound)
972 bufs = ss.pack_apply_message(f,args,kwargs)
1003 bufs = ss.pack_apply_message(f,args,kwargs)
973
1004
974 msg_ids = []
1005 msg_ids = []
975 for queue in queues:
1006 for queue in queues:
976 msg = self.session.send(self._mux_socket, "apply_request",
1007 msg = self.session.send(self._mux_socket, "apply_request",
977 content=content, buffers=bufs,ident=queue, subheader=subheader)
1008 content=content, buffers=bufs,ident=queue, subheader=subheader)
978 msg_id = msg['msg_id']
1009 msg_id = msg['msg_id']
979 self.outstanding.add(msg_id)
1010 self.outstanding.add(msg_id)
980 self.history.append(msg_id)
1011 self.history.append(msg_id)
981 msg_ids.append(msg_id)
1012 msg_ids.append(msg_id)
982 ar = AsyncResult(self, msg_ids, fname=f.__name__)
1013 ar = AsyncResult(self, msg_ids, fname=f.__name__)
983 if block:
1014 if block:
984 return ar.get()
1015 return ar.get()
985 else:
1016 else:
986 return ar
1017 return ar
987
1018
988 #--------------------------------------------------------------------------
1019 #--------------------------------------------------------------------------
989 # Map and decorators
1020 # Map and decorators
990 #--------------------------------------------------------------------------
1021 #--------------------------------------------------------------------------
991
1022
992 def map(self, f, *sequences):
1023 def map(self, f, *sequences):
993 """Parallel version of builtin `map`, using all our engines."""
1024 """Parallel version of builtin `map`, using all our engines."""
994 pf = ParallelFunction(self, f, block=self.block,
1025 pf = ParallelFunction(self, f, block=self.block,
995 bound=True, targets='all')
1026 bound=True, targets='all')
996 return pf.map(*sequences)
1027 return pf.map(*sequences)
997
1028
998 def parallel(self, bound=True, targets='all', block=True):
1029 def parallel(self, bound=True, targets='all', block=True):
999 """Decorator for making a ParallelFunction."""
1030 """Decorator for making a ParallelFunction."""
1000 return parallel(self, bound=bound, targets=targets, block=block)
1031 return parallel(self, bound=bound, targets=targets, block=block)
1001
1032
1002 def remote(self, bound=True, targets='all', block=True):
1033 def remote(self, bound=True, targets='all', block=True):
1003 """Decorator for making a RemoteFunction."""
1034 """Decorator for making a RemoteFunction."""
1004 return remote(self, bound=bound, targets=targets, block=block)
1035 return remote(self, bound=bound, targets=targets, block=block)
1005
1036
1006 #--------------------------------------------------------------------------
1037 #--------------------------------------------------------------------------
1007 # Data movement
1038 # Data movement
1008 #--------------------------------------------------------------------------
1039 #--------------------------------------------------------------------------
1009
1040
1010 @defaultblock
1041 @defaultblock
1011 def push(self, ns, targets='all', block=None):
1042 def push(self, ns, targets='all', block=None):
1012 """Push the contents of `ns` into the namespace on `target`"""
1043 """Push the contents of `ns` into the namespace on `target`"""
1013 if not isinstance(ns, dict):
1044 if not isinstance(ns, dict):
1014 raise TypeError("Must be a dict, not %s"%type(ns))
1045 raise TypeError("Must be a dict, not %s"%type(ns))
1015 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
1046 result = self.apply(_push, (ns,), targets=targets, block=block, bound=True)
1016 return result
1047 return result
1017
1048
1018 @defaultblock
1049 @defaultblock
1019 def pull(self, keys, targets='all', block=None):
1050 def pull(self, keys, targets='all', block=None):
1020 """Pull objects from `target`'s namespace by `keys`"""
1051 """Pull objects from `target`'s namespace by `keys`"""
1021 if isinstance(keys, str):
1052 if isinstance(keys, str):
1022 pass
1053 pass
1023 elif isinstance(keys, (list,tuple,set)):
1054 elif isinstance(keys, (list,tuple,set)):
1024 for key in keys:
1055 for key in keys:
1025 if not isinstance(key, str):
1056 if not isinstance(key, str):
1026 raise TypeError
1057 raise TypeError
1027 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
1058 result = self.apply(_pull, (keys,), targets=targets, block=block, bound=True)
1028 return result
1059 return result
1029
1060
1030 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1061 def scatter(self, key, seq, dist='b', flatten=False, targets='all', block=None):
1031 """
1062 """
1032 Partition a Python sequence and send the partitions to a set of engines.
1063 Partition a Python sequence and send the partitions to a set of engines.
1033 """
1064 """
1034 block = block if block is not None else self.block
1065 block = block if block is not None else self.block
1035 targets = self._build_targets(targets)[-1]
1066 targets = self._build_targets(targets)[-1]
1036 mapObject = Map.dists[dist]()
1067 mapObject = Map.dists[dist]()
1037 nparts = len(targets)
1068 nparts = len(targets)
1038 msg_ids = []
1069 msg_ids = []
1039 for index, engineid in enumerate(targets):
1070 for index, engineid in enumerate(targets):
1040 partition = mapObject.getPartition(seq, index, nparts)
1071 partition = mapObject.getPartition(seq, index, nparts)
1041 if flatten and len(partition) == 1:
1072 if flatten and len(partition) == 1:
1042 r = self.push({key: partition[0]}, targets=engineid, block=False)
1073 r = self.push({key: partition[0]}, targets=engineid, block=False)
1043 else:
1074 else:
1044 r = self.push({key: partition}, targets=engineid, block=False)
1075 r = self.push({key: partition}, targets=engineid, block=False)
1045 msg_ids.extend(r.msg_ids)
1076 msg_ids.extend(r.msg_ids)
1046 r = AsyncResult(self, msg_ids, fname='scatter')
1077 r = AsyncResult(self, msg_ids, fname='scatter')
1047 if block:
1078 if block:
1048 return r.get()
1079 return r.get()
1049 else:
1080 else:
1050 return r
1081 return r
1051
1082
1052 def gather(self, key, dist='b', targets='all', block=None):
1083 def gather(self, key, dist='b', targets='all', block=None):
1053 """
1084 """
1054 Gather a partitioned sequence on a set of engines as a single local seq.
1085 Gather a partitioned sequence on a set of engines as a single local seq.
1055 """
1086 """
1056 block = block if block is not None else self.block
1087 block = block if block is not None else self.block
1057
1088
1058 targets = self._build_targets(targets)[-1]
1089 targets = self._build_targets(targets)[-1]
1059 mapObject = Map.dists[dist]()
1090 mapObject = Map.dists[dist]()
1060 msg_ids = []
1091 msg_ids = []
1061 for index, engineid in enumerate(targets):
1092 for index, engineid in enumerate(targets):
1062 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1093 msg_ids.extend(self.pull(key, targets=engineid,block=False).msg_ids)
1063
1094
1064 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1095 r = AsyncMapResult(self, msg_ids, mapObject, fname='gather')
1065 if block:
1096 if block:
1066 return r.get()
1097 return r.get()
1067 else:
1098 else:
1068 return r
1099 return r
1069
1100
1070 #--------------------------------------------------------------------------
1101 #--------------------------------------------------------------------------
1071 # Query methods
1102 # Query methods
1072 #--------------------------------------------------------------------------
1103 #--------------------------------------------------------------------------
1073
1104
1074 @spinfirst
1105 @spinfirst
1075 def get_results(self, msg_ids, status_only=False):
1106 def get_results(self, msg_ids, status_only=False):
1076 """Returns the result of the execute or task request with `msg_ids`.
1107 """Returns the result of the execute or task request with `msg_ids`.
1077
1108
1078 Parameters
1109 Parameters
1079 ----------
1110 ----------
1080 msg_ids : list of ints or msg_ids
1111 msg_ids : list of ints or msg_ids
1081 if int:
1112 if int:
1082 Passed as index to self.history for convenience.
1113 Passed as index to self.history for convenience.
1083 status_only : bool (default: False)
1114 status_only : bool (default: False)
1084 if False:
1115 if False:
1085 return the actual results
1116 return the actual results
1086
1117
1087 Returns
1118 Returns
1088 -------
1119 -------
1089
1120
1090 results : dict
1121 results : dict
1091 There will always be the keys 'pending' and 'completed', which will
1122 There will always be the keys 'pending' and 'completed', which will
1092 be lists of msg_ids.
1123 be lists of msg_ids.
1093 """
1124 """
1094 if not isinstance(msg_ids, (list,tuple)):
1125 if not isinstance(msg_ids, (list,tuple)):
1095 msg_ids = [msg_ids]
1126 msg_ids = [msg_ids]
1096 theids = []
1127 theids = []
1097 for msg_id in msg_ids:
1128 for msg_id in msg_ids:
1098 if isinstance(msg_id, int):
1129 if isinstance(msg_id, int):
1099 msg_id = self.history[msg_id]
1130 msg_id = self.history[msg_id]
1100 if not isinstance(msg_id, str):
1131 if not isinstance(msg_id, str):
1101 raise TypeError("msg_ids must be str, not %r"%msg_id)
1132 raise TypeError("msg_ids must be str, not %r"%msg_id)
1102 theids.append(msg_id)
1133 theids.append(msg_id)
1103
1134
1104 completed = []
1135 completed = []
1105 local_results = {}
1136 local_results = {}
1106 # temporarily disable local shortcut
1137
1107 # for msg_id in list(theids):
1138 # comment this block out to temporarily disable local shortcut:
1108 # if msg_id in self.results:
1139 for msg_id in list(theids):
1109 # completed.append(msg_id)
1140 if msg_id in self.results:
1110 # local_results[msg_id] = self.results[msg_id]
1141 completed.append(msg_id)
1111 # theids.remove(msg_id)
1142 local_results[msg_id] = self.results[msg_id]
1143 theids.remove(msg_id)
1112
1144
1113 if theids: # some not locally cached
1145 if theids: # some not locally cached
1114 content = dict(msg_ids=theids, status_only=status_only)
1146 content = dict(msg_ids=theids, status_only=status_only)
1115 msg = self.session.send(self._query_socket, "result_request", content=content)
1147 msg = self.session.send(self._query_socket, "result_request", content=content)
1116 zmq.select([self._query_socket], [], [])
1148 zmq.select([self._query_socket], [], [])
1117 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1149 idents,msg = self.session.recv(self._query_socket, zmq.NOBLOCK)
1118 if self.debug:
1150 if self.debug:
1119 pprint(msg)
1151 pprint(msg)
1120 content = msg['content']
1152 content = msg['content']
1121 if content['status'] != 'ok':
1153 if content['status'] != 'ok':
1122 raise ss.unwrap_exception(content)
1154 raise ss.unwrap_exception(content)
1123 buffers = msg['buffers']
1155 buffers = msg['buffers']
1124 else:
1156 else:
1125 content = dict(completed=[],pending=[])
1157 content = dict(completed=[],pending=[])
1126
1158
1127 content['completed'].extend(completed)
1159 content['completed'].extend(completed)
1128
1160
1129 if status_only:
1161 if status_only:
1130 return content
1162 return content
1131
1163
1132 failures = []
1164 failures = []
1133 # load cached results into result:
1165 # load cached results into result:
1134 content.update(local_results)
1166 content.update(local_results)
1135 # update cache with results:
1167 # update cache with results:
1136 for msg_id in sorted(theids):
1168 for msg_id in sorted(theids):
1137 if msg_id in content['completed']:
1169 if msg_id in content['completed']:
1138 rec = content[msg_id]
1170 rec = content[msg_id]
1139 parent = rec['header']
1171 parent = rec['header']
1140 header = rec['result_header']
1172 header = rec['result_header']
1141 rcontent = rec['result_content']
1173 rcontent = rec['result_content']
1142 iodict = rec['io']
1174 iodict = rec['io']
1143 if isinstance(rcontent, str):
1175 if isinstance(rcontent, str):
1144 rcontent = self.session.unpack(rcontent)
1176 rcontent = self.session.unpack(rcontent)
1145
1177
1146 md = self.metadata.setdefault(msg_id, Metadata())
1178 md = self.metadata.setdefault(msg_id, Metadata())
1147 md.update(self._extract_metadata(header, parent, rcontent))
1179 md.update(self._extract_metadata(header, parent, rcontent))
1148 md.update(iodict)
1180 md.update(iodict)
1149
1181
1150 if rcontent['status'] == 'ok':
1182 if rcontent['status'] == 'ok':
1151 res,buffers = ss.unserialize_object(buffers)
1183 res,buffers = ss.unserialize_object(buffers)
1152 else:
1184 else:
1153 res = ss.unwrap_exception(rcontent)
1185 res = ss.unwrap_exception(rcontent)
1154 failures.append(res)
1186 failures.append(res)
1155
1187
1156 self.results[msg_id] = res
1188 self.results[msg_id] = res
1157 content[msg_id] = res
1189 content[msg_id] = res
1158
1190
1159 error.collect_exceptions(failures, "get_results")
1191 error.collect_exceptions(failures, "get_results")
1160 return content
1192 return content
1161
1193
1162 @spinfirst
1194 @spinfirst
1163 def queue_status(self, targets=None, verbose=False):
1195 def queue_status(self, targets=None, verbose=False):
1164 """Fetch the status of engine queues.
1196 """Fetch the status of engine queues.
1165
1197
1166 Parameters
1198 Parameters
1167 ----------
1199 ----------
1168 targets : int/str/list of ints/strs
1200 targets : int/str/list of ints/strs
1169 the engines on which to execute
1201 the engines on which to execute
1170 default : all
1202 default : all
1171 verbose : bool
1203 verbose : bool
1172 Whether to return lengths only, or lists of ids for each element
1204 Whether to return lengths only, or lists of ids for each element
1173 """
1205 """
1174 targets = self._build_targets(targets)[1]
1206 targets = self._build_targets(targets)[1]
1175 content = dict(targets=targets, verbose=verbose)
1207 content = dict(targets=targets, verbose=verbose)
1176 self.session.send(self._query_socket, "queue_request", content=content)
1208 self.session.send(self._query_socket, "queue_request", content=content)
1177 idents,msg = self.session.recv(self._query_socket, 0)
1209 idents,msg = self.session.recv(self._query_socket, 0)
1178 if self.debug:
1210 if self.debug:
1179 pprint(msg)
1211 pprint(msg)
1180 content = msg['content']
1212 content = msg['content']
1181 status = content.pop('status')
1213 status = content.pop('status')
1182 if status != 'ok':
1214 if status != 'ok':
1183 raise ss.unwrap_exception(content)
1215 raise ss.unwrap_exception(content)
1184 return ss.rekey(content)
1216 return ss.rekey(content)
1185
1217
1186 @spinfirst
1218 @spinfirst
1187 def purge_results(self, msg_ids=[], targets=[]):
1219 def purge_results(self, msg_ids=[], targets=[]):
1188 """Tell the controller to forget results.
1220 """Tell the controller to forget results.
1189
1221
1190 Individual results can be purged by msg_id, or the entire
1222 Individual results can be purged by msg_id, or the entire
1191 history of specific targets can be purged.
1223 history of specific targets can be purged.
1192
1224
1193 Parameters
1225 Parameters
1194 ----------
1226 ----------
1195 msg_ids : str or list of strs
1227 msg_ids : str or list of strs
1196 the msg_ids whose results should be forgotten.
1228 the msg_ids whose results should be forgotten.
1197 targets : int/str/list of ints/strs
1229 targets : int/str/list of ints/strs
1198 The targets, by uuid or int_id, whose entire history is to be purged.
1230 The targets, by uuid or int_id, whose entire history is to be purged.
1199 Use `targets='all'` to scrub everything from the controller's memory.
1231 Use `targets='all'` to scrub everything from the controller's memory.
1200
1232
1201 default : None
1233 default : None
1202 """
1234 """
1203 if not targets and not msg_ids:
1235 if not targets and not msg_ids:
1204 raise ValueError
1236 raise ValueError
1205 if targets:
1237 if targets:
1206 targets = self._build_targets(targets)[1]
1238 targets = self._build_targets(targets)[1]
1207 content = dict(targets=targets, msg_ids=msg_ids)
1239 content = dict(targets=targets, msg_ids=msg_ids)
1208 self.session.send(self._query_socket, "purge_request", content=content)
1240 self.session.send(self._query_socket, "purge_request", content=content)
1209 idents, msg = self.session.recv(self._query_socket, 0)
1241 idents, msg = self.session.recv(self._query_socket, 0)
1210 if self.debug:
1242 if self.debug:
1211 pprint(msg)
1243 pprint(msg)
1212 content = msg['content']
1244 content = msg['content']
1213 if content['status'] != 'ok':
1245 if content['status'] != 'ok':
1214 raise ss.unwrap_exception(content)
1246 raise ss.unwrap_exception(content)
1215
1247
1216 #----------------------------------------
1248 #----------------------------------------
1217 # activate for %px,%autopx magics
1249 # activate for %px,%autopx magics
1218 #----------------------------------------
1250 #----------------------------------------
1219 def activate(self):
1251 def activate(self):
1220 """Make this `View` active for parallel magic commands.
1252 """Make this `View` active for parallel magic commands.
1221
1253
1222 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1254 IPython has a magic command syntax to work with `MultiEngineClient` objects.
1223 In a given IPython session there is a single active one. While
1255 In a given IPython session there is a single active one. While
1224 there can be many `Views` created and used by the user,
1256 there can be many `Views` created and used by the user,
1225 there is only one active one. The active `View` is used whenever
1257 there is only one active one. The active `View` is used whenever
1226 the magic commands %px and %autopx are used.
1258 the magic commands %px and %autopx are used.
1227
1259
1228 The activate() method is called on a given `View` to make it
1260 The activate() method is called on a given `View` to make it
1229 active. Once this has been done, the magic commands can be used.
1261 active. Once this has been done, the magic commands can be used.
1230 """
1262 """
1231
1263
1232 try:
1264 try:
1233 # This is injected into __builtins__.
1265 # This is injected into __builtins__.
1234 ip = get_ipython()
1266 ip = get_ipython()
1235 except NameError:
1267 except NameError:
1236 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1268 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
1237 else:
1269 else:
1238 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1270 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
1239 if pmagic is not None:
1271 if pmagic is not None:
1240 pmagic.active_multiengine_client = self
1272 pmagic.active_multiengine_client = self
1241 else:
1273 else:
1242 print "You must first load the parallelmagic extension " \
1274 print "You must first load the parallelmagic extension " \
1243 "by doing '%load_ext parallelmagic'"
1275 "by doing '%load_ext parallelmagic'"
1244
1276
1245 class AsynClient(Client):
1277 class AsynClient(Client):
1246 """An Asynchronous client, using the Tornado Event Loop.
1278 """An Asynchronous client, using the Tornado Event Loop.
1247 !!!unfinished!!!"""
1279 !!!unfinished!!!"""
1248 io_loop = None
1280 io_loop = None
1249 _queue_stream = None
1281 _queue_stream = None
1250 _notifier_stream = None
1282 _notifier_stream = None
1251 _task_stream = None
1283 _task_stream = None
1252 _control_stream = None
1284 _control_stream = None
1253
1285
1254 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1286 def __init__(self, addr, context=None, username=None, debug=False, io_loop=None):
1255 Client.__init__(self, addr, context, username, debug)
1287 Client.__init__(self, addr, context, username, debug)
1256 if io_loop is None:
1288 if io_loop is None:
1257 io_loop = ioloop.IOLoop.instance()
1289 io_loop = ioloop.IOLoop.instance()
1258 self.io_loop = io_loop
1290 self.io_loop = io_loop
1259
1291
1260 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1292 self._queue_stream = zmqstream.ZMQStream(self._mux_socket, io_loop)
1261 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1293 self._control_stream = zmqstream.ZMQStream(self._control_socket, io_loop)
1262 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1294 self._task_stream = zmqstream.ZMQStream(self._task_socket, io_loop)
1263 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1295 self._notification_stream = zmqstream.ZMQStream(self._notification_socket, io_loop)
1264
1296
1265 def spin(self):
1297 def spin(self):
1266 for stream in (self.queue_stream, self.notifier_stream,
1298 for stream in (self.queue_stream, self.notifier_stream,
1267 self.task_stream, self.control_stream):
1299 self.task_stream, self.control_stream):
1268 stream.flush()
1300 stream.flush()
1269
1301
1270 __all__ = [ 'Client',
1302 __all__ = [ 'Client',
1271 'depend',
1303 'depend',
1272 'require',
1304 'require',
1273 'remote',
1305 'remote',
1274 'parallel',
1306 'parallel',
1275 'RemoteFunction',
1307 'RemoteFunction',
1276 'ParallelFunction',
1308 'ParallelFunction',
1277 'DirectView',
1309 'DirectView',
1278 'LoadBalancedView',
1310 'LoadBalancedView',
1279 'AsyncResult',
1311 'AsyncResult',
1280 'AsyncMapResult'
1312 'AsyncMapResult'
1281 ]
1313 ]
@@ -1,108 +1,108 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller with 0MQ
2 """The IPython Controller with 0MQ
3 This is a collection of one Hub and several Schedulers.
3 This is a collection of one Hub and several Schedulers.
4 """
4 """
5 #-----------------------------------------------------------------------------
5 #-----------------------------------------------------------------------------
6 # Copyright (C) 2010 The IPython Development Team
6 # Copyright (C) 2010 The IPython Development Team
7 #
7 #
8 # Distributed under the terms of the BSD License. The full license is in
8 # Distributed under the terms of the BSD License. The full license is in
9 # the file COPYING, distributed as part of this software.
9 # the file COPYING, distributed as part of this software.
10 #-----------------------------------------------------------------------------
10 #-----------------------------------------------------------------------------
11
11
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13 # Imports
13 # Imports
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 from __future__ import print_function
15 from __future__ import print_function
16
16
17 import logging
17 import logging
18 from multiprocessing import Process
18 from multiprocessing import Process
19
19
20 import zmq
20 import zmq
21
21
22 # internal:
22 # internal:
23 from IPython.utils.importstring import import_item
23 from IPython.utils.importstring import import_item
24 from IPython.utils.traitlets import Int, Str, Instance, List, Bool
24 from IPython.utils.traitlets import Int, Str, Instance, List, Bool
25
25
26 from entry_point import signal_children
26 from entry_point import signal_children
27
27
28
28
29 from scheduler import launch_scheduler
29 from scheduler import launch_scheduler
30 from hub import Hub, HubFactory
30 from hub import Hub, HubFactory
31
31
32 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
33 # Configurable
33 # Configurable
34 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
35
35
36
36
37 class ControllerFactory(HubFactory):
37 class ControllerFactory(HubFactory):
38 """Configurable for setting up a Hub and Schedulers."""
38 """Configurable for setting up a Hub and Schedulers."""
39
39
40 scheme = Str('pure', config=True)
41 usethreads = Bool(False, config=True)
40 usethreads = Bool(False, config=True)
42
41
43 # internal
42 # internal
44 children = List()
43 children = List()
45 mq_class = Str('zmq.devices.ProcessMonitoredQueue')
44 mq_class = Str('zmq.devices.ProcessMonitoredQueue')
46
45
47 def _usethreads_changed(self, name, old, new):
46 def _usethreads_changed(self, name, old, new):
48 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
47 self.mq_class = 'zmq.devices.%sMonitoredQueue'%('Thread' if new else 'Process')
49
48
50 def __init__(self, **kwargs):
49 def __init__(self, **kwargs):
51 super(ControllerFactory, self).__init__(**kwargs)
50 super(ControllerFactory, self).__init__(**kwargs)
52 self.subconstructors.append(self.construct_schedulers)
51 self.subconstructors.append(self.construct_schedulers)
53
52
54 def start(self):
53 def start(self):
55 super(ControllerFactory, self).start()
54 super(ControllerFactory, self).start()
56 for child in self.children:
55 for child in self.children:
57 child.start()
56 child.start()
58 if not self.usethreads:
57 if not self.usethreads:
59 signal_children([ getattr(c, 'launcher', c) for c in self.children ])
58 signal_children([ getattr(c, 'launcher', c) for c in self.children ])
60
59
61
60
62 def construct_schedulers(self):
61 def construct_schedulers(self):
63 children = self.children
62 children = self.children
64 mq = import_item(self.mq_class)
63 mq = import_item(self.mq_class)
65
64
66 # IOPub relay (in a Process)
65 # IOPub relay (in a Process)
67 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
66 q = mq(zmq.PUB, zmq.SUB, zmq.PUB, 'N/A','iopub')
68 q.bind_in(self.client_addrs['iopub'])
67 q.bind_in(self.client_info['iopub'])
69 q.bind_out(self.engine_addrs['iopub'])
68 q.bind_out(self.engine_info['iopub'])
70 q.setsockopt_out(zmq.SUBSCRIBE, '')
69 q.setsockopt_out(zmq.SUBSCRIBE, '')
71 q.connect_mon(self.monitor_url)
70 q.connect_mon(self.monitor_url)
72 q.daemon=True
71 q.daemon=True
73 children.append(q)
72 children.append(q)
74
73
75 # Multiplexer Queue (in a Process)
74 # Multiplexer Queue (in a Process)
76 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
75 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'in', 'out')
77 q.bind_in(self.client_addrs['mux'])
76 q.bind_in(self.client_info['mux'])
78 q.bind_out(self.engine_addrs['mux'])
77 q.bind_out(self.engine_info['mux'])
79 q.connect_mon(self.monitor_url)
78 q.connect_mon(self.monitor_url)
80 q.daemon=True
79 q.daemon=True
81 children.append(q)
80 children.append(q)
82
81
83 # Control Queue (in a Process)
82 # Control Queue (in a Process)
84 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
83 q = mq(zmq.XREP, zmq.XREP, zmq.PUB, 'incontrol', 'outcontrol')
85 q.bind_in(self.client_addrs['control'])
84 q.bind_in(self.client_info['control'])
86 q.bind_out(self.engine_addrs['control'])
85 q.bind_out(self.engine_info['control'])
87 q.connect_mon(self.monitor_url)
86 q.connect_mon(self.monitor_url)
88 q.daemon=True
87 q.daemon=True
89 children.append(q)
88 children.append(q)
90 # Task Queue (in a Process)
89 # Task Queue (in a Process)
91 if self.scheme == 'pure':
90 if self.scheme == 'pure':
92 self.log.warn("task::using pure XREQ Task scheduler")
91 self.log.warn("task::using pure XREQ Task scheduler")
93 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
92 q = mq(zmq.XREP, zmq.XREQ, zmq.PUB, 'intask', 'outtask')
94 q.bind_in(self.client_addrs['task'])
93 q.bind_in(self.client_info['task'][1])
95 q.bind_out(self.engine_addrs['task'])
94 q.bind_out(self.engine_info['task'])
96 q.connect_mon(self.monitor_url)
95 q.connect_mon(self.monitor_url)
97 q.daemon=True
96 q.daemon=True
98 children.append(q)
97 children.append(q)
99 elif self.scheme == 'none':
98 elif self.scheme == 'none':
100 self.log.warn("task::using no Task scheduler")
99 self.log.warn("task::using no Task scheduler")
101
100
102 else:
101 else:
103 self.log.info("task::using Python %s Task scheduler"%self.scheme)
102 self.log.info("task::using Python %s Task scheduler"%self.scheme)
104 sargs = (self.client_addrs['task'], self.engine_addrs['task'], self.monitor_url, self.client_addrs['notification'])
103 sargs = (self.client_info['task'], self.engine_info['task'], self.monitor_url, self.client_info['notification'])
105 q = Process(target=launch_scheduler, args=sargs, kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level))
104 kwargs = dict(scheme=self.scheme,logname=self.log.name, loglevel=self.log.level, config=self.config)
105 q = Process(target=launch_scheduler, args=sargs, kwargs=kwargs)
106 q.daemon=True
106 q.daemon=True
107 children.append(q)
107 children.append(q)
108
108
@@ -1,1045 +1,1053 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 """The IPython Controller Hub with 0MQ
2 """The IPython Controller Hub with 0MQ
3 This is the master object that handles connections from engines and clients,
3 This is the master object that handles connections from engines and clients,
4 and monitors traffic through the various queues.
4 and monitors traffic through the various queues.
5 """
5 """
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2010 The IPython Development Team
7 # Copyright (C) 2010 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16 from __future__ import print_function
16 from __future__ import print_function
17
17
18 import sys
18 import sys
19 from datetime import datetime
19 from datetime import datetime
20 import time
20 import time
21 import logging
21 import logging
22
22
23 import zmq
23 import zmq
24 from zmq.eventloop import ioloop
24 from zmq.eventloop import ioloop
25 from zmq.eventloop.zmqstream import ZMQStream
25 from zmq.eventloop.zmqstream import ZMQStream
26
26
27 # internal:
27 # internal:
28 from IPython.config.configurable import Configurable
28 from IPython.config.configurable import Configurable
29 from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict, Set, List, Bool
29 from IPython.utils.traitlets import HasTraits, Instance, Int, Str, Dict, Set, List, Bool
30 from IPython.utils.importstring import import_item
30 from IPython.utils.importstring import import_item
31
31
32 from entry_point import select_random_ports
32 from entry_point import select_random_ports
33 from factory import RegistrationFactory, LoggingFactory
33 from factory import RegistrationFactory, LoggingFactory
34
34
35 from streamsession import Message, wrap_exception, ISO8601
35 from streamsession import Message, wrap_exception, ISO8601
36 from heartmonitor import HeartMonitor
36 from heartmonitor import HeartMonitor
37 from util import validate_url_container
37 from util import validate_url_container
38
38
39 try:
39 try:
40 from pymongo.binary import Binary
40 from pymongo.binary import Binary
41 except ImportError:
41 except ImportError:
42 MongoDB=None
42 MongoDB=None
43 else:
43 else:
44 from mongodb import MongoDB
44 from mongodb import MongoDB
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Code
47 # Code
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50 def _passer(*args, **kwargs):
50 def _passer(*args, **kwargs):
51 return
51 return
52
52
53 def _printer(*args, **kwargs):
53 def _printer(*args, **kwargs):
54 print (args)
54 print (args)
55 print (kwargs)
55 print (kwargs)
56
56
57 def init_record(msg):
57 def init_record(msg):
58 """Initialize a TaskRecord based on a request."""
58 """Initialize a TaskRecord based on a request."""
59 header = msg['header']
59 header = msg['header']
60 return {
60 return {
61 'msg_id' : header['msg_id'],
61 'msg_id' : header['msg_id'],
62 'header' : header,
62 'header' : header,
63 'content': msg['content'],
63 'content': msg['content'],
64 'buffers': msg['buffers'],
64 'buffers': msg['buffers'],
65 'submitted': datetime.strptime(header['date'], ISO8601),
65 'submitted': datetime.strptime(header['date'], ISO8601),
66 'client_uuid' : None,
66 'client_uuid' : None,
67 'engine_uuid' : None,
67 'engine_uuid' : None,
68 'started': None,
68 'started': None,
69 'completed': None,
69 'completed': None,
70 'resubmitted': None,
70 'resubmitted': None,
71 'result_header' : None,
71 'result_header' : None,
72 'result_content' : None,
72 'result_content' : None,
73 'result_buffers' : None,
73 'result_buffers' : None,
74 'queue' : None,
74 'queue' : None,
75 'pyin' : None,
75 'pyin' : None,
76 'pyout': None,
76 'pyout': None,
77 'pyerr': None,
77 'pyerr': None,
78 'stdout': '',
78 'stdout': '',
79 'stderr': '',
79 'stderr': '',
80 }
80 }
81
81
82
82
83 class EngineConnector(HasTraits):
83 class EngineConnector(HasTraits):
84 """A simple object for accessing the various zmq connections of an object.
84 """A simple object for accessing the various zmq connections of an object.
85 Attributes are:
85 Attributes are:
86 id (int): engine ID
86 id (int): engine ID
87 uuid (str): uuid (unused?)
87 uuid (str): uuid (unused?)
88 queue (str): identity of queue's XREQ socket
88 queue (str): identity of queue's XREQ socket
89 registration (str): identity of registration XREQ socket
89 registration (str): identity of registration XREQ socket
90 heartbeat (str): identity of heartbeat XREQ socket
90 heartbeat (str): identity of heartbeat XREQ socket
91 """
91 """
92 id=Int(0)
92 id=Int(0)
93 queue=Str()
93 queue=Str()
94 control=Str()
94 control=Str()
95 registration=Str()
95 registration=Str()
96 heartbeat=Str()
96 heartbeat=Str()
97 pending=Set()
97 pending=Set()
98
98
99 class HubFactory(RegistrationFactory):
99 class HubFactory(RegistrationFactory):
100 """The Configurable for setting up a Hub."""
100 """The Configurable for setting up a Hub."""
101
101
102 # name of a scheduler scheme
103 scheme = Str('lru', config=True)
104
102 # port-pairs for monitoredqueues:
105 # port-pairs for monitoredqueues:
103 hb = Instance(list, config=True)
106 hb = Instance(list, config=True)
104 def _hb_default(self):
107 def _hb_default(self):
105 return select_random_ports(2)
108 return select_random_ports(2)
106
109
107 mux = Instance(list, config=True)
110 mux = Instance(list, config=True)
108 def _mux_default(self):
111 def _mux_default(self):
109 return select_random_ports(2)
112 return select_random_ports(2)
110
113
111 task = Instance(list, config=True)
114 task = Instance(list, config=True)
112 def _task_default(self):
115 def _task_default(self):
113 return select_random_ports(2)
116 return select_random_ports(2)
114
117
115 control = Instance(list, config=True)
118 control = Instance(list, config=True)
116 def _control_default(self):
119 def _control_default(self):
117 return select_random_ports(2)
120 return select_random_ports(2)
118
121
119 iopub = Instance(list, config=True)
122 iopub = Instance(list, config=True)
120 def _iopub_default(self):
123 def _iopub_default(self):
121 return select_random_ports(2)
124 return select_random_ports(2)
122
125
123 # single ports:
126 # single ports:
124 mon_port = Instance(int, config=True)
127 mon_port = Instance(int, config=True)
125 def _mon_port_default(self):
128 def _mon_port_default(self):
126 return select_random_ports(1)[0]
129 return select_random_ports(1)[0]
127
130
128 query_port = Instance(int, config=True)
131 query_port = Instance(int, config=True)
129 def _query_port_default(self):
132 def _query_port_default(self):
130 return select_random_ports(1)[0]
133 return select_random_ports(1)[0]
131
134
132 notifier_port = Instance(int, config=True)
135 notifier_port = Instance(int, config=True)
133 def _notifier_port_default(self):
136 def _notifier_port_default(self):
134 return select_random_ports(1)[0]
137 return select_random_ports(1)[0]
135
138
136 ping = Int(1000, config=True) # ping frequency
139 ping = Int(1000, config=True) # ping frequency
137
140
138 engine_ip = Str('127.0.0.1', config=True)
141 engine_ip = Str('127.0.0.1', config=True)
139 engine_transport = Str('tcp', config=True)
142 engine_transport = Str('tcp', config=True)
140
143
141 client_ip = Str('127.0.0.1', config=True)
144 client_ip = Str('127.0.0.1', config=True)
142 client_transport = Str('tcp', config=True)
145 client_transport = Str('tcp', config=True)
143
146
144 monitor_ip = Str('127.0.0.1', config=True)
147 monitor_ip = Str('127.0.0.1', config=True)
145 monitor_transport = Str('tcp', config=True)
148 monitor_transport = Str('tcp', config=True)
146
149
147 monitor_url = Str('')
150 monitor_url = Str('')
148
151
149 db_class = Str('IPython.zmq.parallel.dictdb.DictDB', config=True)
152 db_class = Str('IPython.zmq.parallel.dictdb.DictDB', config=True)
150
153
151 # not configurable
154 # not configurable
152 db = Instance('IPython.zmq.parallel.dictdb.BaseDB')
155 db = Instance('IPython.zmq.parallel.dictdb.BaseDB')
153 heartmonitor = Instance('IPython.zmq.parallel.heartmonitor.HeartMonitor')
156 heartmonitor = Instance('IPython.zmq.parallel.heartmonitor.HeartMonitor')
154 subconstructors = List()
157 subconstructors = List()
155 _constructed = Bool(False)
158 _constructed = Bool(False)
156
159
157 def _ip_changed(self, name, old, new):
160 def _ip_changed(self, name, old, new):
158 self.engine_ip = new
161 self.engine_ip = new
159 self.client_ip = new
162 self.client_ip = new
160 self.monitor_ip = new
163 self.monitor_ip = new
161 self._update_monitor_url()
164 self._update_monitor_url()
162
165
163 def _update_monitor_url(self):
166 def _update_monitor_url(self):
164 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
167 self.monitor_url = "%s://%s:%i"%(self.monitor_transport, self.monitor_ip, self.mon_port)
165
168
166 def _transport_changed(self, name, old, new):
169 def _transport_changed(self, name, old, new):
167 self.engine_transport = new
170 self.engine_transport = new
168 self.client_transport = new
171 self.client_transport = new
169 self.monitor_transport = new
172 self.monitor_transport = new
170 self._update_monitor_url()
173 self._update_monitor_url()
171
174
172 def __init__(self, **kwargs):
175 def __init__(self, **kwargs):
173 super(HubFactory, self).__init__(**kwargs)
176 super(HubFactory, self).__init__(**kwargs)
174 self._update_monitor_url()
177 self._update_monitor_url()
175 # self.on_trait_change(self._sync_ips, 'ip')
178 # self.on_trait_change(self._sync_ips, 'ip')
176 # self.on_trait_change(self._sync_transports, 'transport')
179 # self.on_trait_change(self._sync_transports, 'transport')
177 self.subconstructors.append(self.construct_hub)
180 self.subconstructors.append(self.construct_hub)
178
181
179
182
180 def construct(self):
183 def construct(self):
181 assert not self._constructed, "already constructed!"
184 assert not self._constructed, "already constructed!"
182
185
183 for subc in self.subconstructors:
186 for subc in self.subconstructors:
184 subc()
187 subc()
185
188
186 self._constructed = True
189 self._constructed = True
187
190
188
191
189 def start(self):
192 def start(self):
190 assert self._constructed, "must be constructed by self.construct() first!"
193 assert self._constructed, "must be constructed by self.construct() first!"
191 self.heartmonitor.start()
194 self.heartmonitor.start()
192 self.log.info("Heartmonitor started")
195 self.log.info("Heartmonitor started")
193
196
194 def construct_hub(self):
197 def construct_hub(self):
195 """construct"""
198 """construct"""
196 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
199 client_iface = "%s://%s:"%(self.client_transport, self.client_ip) + "%i"
197 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
200 engine_iface = "%s://%s:"%(self.engine_transport, self.engine_ip) + "%i"
198
201
199 ctx = self.context
202 ctx = self.context
200 loop = self.loop
203 loop = self.loop
201
204
202 # Registrar socket
205 # Registrar socket
203 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
206 reg = ZMQStream(ctx.socket(zmq.XREP), loop)
204 reg.bind(client_iface % self.regport)
207 reg.bind(client_iface % self.regport)
205 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
208 self.log.info("Hub listening on %s for registration."%(client_iface%self.regport))
206 if self.client_ip != self.engine_ip:
209 if self.client_ip != self.engine_ip:
207 reg.bind(engine_iface % self.regport)
210 reg.bind(engine_iface % self.regport)
208 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
211 self.log.info("Hub listening on %s for registration."%(engine_iface%self.regport))
209
212
210 ### Engine connections ###
213 ### Engine connections ###
211
214
212 # heartbeat
215 # heartbeat
213 hpub = ctx.socket(zmq.PUB)
216 hpub = ctx.socket(zmq.PUB)
214 hpub.bind(engine_iface % self.hb[0])
217 hpub.bind(engine_iface % self.hb[0])
215 hrep = ctx.socket(zmq.XREP)
218 hrep = ctx.socket(zmq.XREP)
216 hrep.bind(engine_iface % self.hb[1])
219 hrep.bind(engine_iface % self.hb[1])
217 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
220 self.heartmonitor = HeartMonitor(loop=loop, pingstream=ZMQStream(hpub,loop), pongstream=ZMQStream(hrep,loop),
218 period=self.ping, logname=self.log.name)
221 period=self.ping, logname=self.log.name)
219
222
220 ### Client connections ###
223 ### Client connections ###
221 # Clientele socket
224 # Clientele socket
222 c = ZMQStream(ctx.socket(zmq.XREP), loop)
225 c = ZMQStream(ctx.socket(zmq.XREP), loop)
223 c.bind(client_iface%self.query_port)
226 c.bind(client_iface%self.query_port)
224 # Notifier socket
227 # Notifier socket
225 n = ZMQStream(ctx.socket(zmq.PUB), loop)
228 n = ZMQStream(ctx.socket(zmq.PUB), loop)
226 n.bind(client_iface%self.notifier_port)
229 n.bind(client_iface%self.notifier_port)
227
230
228 ### build and launch the queues ###
231 ### build and launch the queues ###
229
232
230 # monitor socket
233 # monitor socket
231 sub = ctx.socket(zmq.SUB)
234 sub = ctx.socket(zmq.SUB)
232 sub.setsockopt(zmq.SUBSCRIBE, "")
235 sub.setsockopt(zmq.SUBSCRIBE, "")
233 sub.bind(self.monitor_url)
236 sub.bind(self.monitor_url)
234 sub = ZMQStream(sub, loop)
237 sub = ZMQStream(sub, loop)
235
238
236 # connect the db
239 # connect the db
237 self.db = import_item(self.db_class)()
240 self.db = import_item(self.db_class)()
238 time.sleep(.25)
241 time.sleep(.25)
239
242
240 # build connection dicts
243 # build connection dicts
241 self.engine_addrs = {
244 self.engine_info = {
242 'control' : engine_iface%self.control[1],
245 'control' : engine_iface%self.control[1],
243 'mux': engine_iface%self.mux[1],
246 'mux': engine_iface%self.mux[1],
244 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
247 'heartbeat': (engine_iface%self.hb[0], engine_iface%self.hb[1]),
245 'task' : engine_iface%self.task[1],
248 'task' : engine_iface%self.task[1],
246 'iopub' : engine_iface%self.iopub[1],
249 'iopub' : engine_iface%self.iopub[1],
247 # 'monitor' : engine_iface%self.mon_port,
250 # 'monitor' : engine_iface%self.mon_port,
248 }
251 }
249
252
250 self.client_addrs = {
253 self.client_info = {
251 'control' : client_iface%self.control[0],
254 'control' : client_iface%self.control[0],
252 'query': client_iface%self.query_port,
255 'query': client_iface%self.query_port,
253 'mux': client_iface%self.mux[0],
256 'mux': client_iface%self.mux[0],
254 'task' : client_iface%self.task[0],
257 'task' : (self.scheme, client_iface%self.task[0]),
255 'iopub' : client_iface%self.iopub[0],
258 'iopub' : client_iface%self.iopub[0],
256 'notification': client_iface%self.notifier_port
259 'notification': client_iface%self.notifier_port
257 }
260 }
258 self.log.debug("hub::Hub engine addrs: %s"%self.engine_addrs)
261 self.log.debug("hub::Hub engine addrs: %s"%self.engine_info)
259 self.log.debug("hub::Hub client addrs: %s"%self.client_addrs)
262 self.log.debug("hub::Hub client addrs: %s"%self.client_info)
260 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
263 self.hub = Hub(loop=loop, session=self.session, monitor=sub, heartmonitor=self.heartmonitor,
261 registrar=reg, clientele=c, notifier=n, db=self.db,
264 registrar=reg, clientele=c, notifier=n, db=self.db,
262 engine_addrs=self.engine_addrs, client_addrs=self.client_addrs,
265 engine_info=self.engine_info, client_info=self.client_info,
263 logname=self.log.name)
266 logname=self.log.name)
264
267
265
268
266 class Hub(LoggingFactory):
269 class Hub(LoggingFactory):
267 """The IPython Controller Hub with 0MQ connections
270 """The IPython Controller Hub with 0MQ connections
268
271
269 Parameters
272 Parameters
270 ==========
273 ==========
271 loop: zmq IOLoop instance
274 loop: zmq IOLoop instance
272 session: StreamSession object
275 session: StreamSession object
273 <removed> context: zmq context for creating new connections (?)
276 <removed> context: zmq context for creating new connections (?)
274 queue: ZMQStream for monitoring the command queue (SUB)
277 queue: ZMQStream for monitoring the command queue (SUB)
275 registrar: ZMQStream for engine registration requests (XREP)
278 registrar: ZMQStream for engine registration requests (XREP)
276 heartbeat: HeartMonitor object checking the pulse of the engines
279 heartbeat: HeartMonitor object checking the pulse of the engines
277 clientele: ZMQStream for client connections (XREP)
280 clientele: ZMQStream for client connections (XREP)
278 not used for jobs, only query/control commands
281 not used for jobs, only query/control commands
279 notifier: ZMQStream for broadcasting engine registration changes (PUB)
282 notifier: ZMQStream for broadcasting engine registration changes (PUB)
280 db: connection to db for out of memory logging of commands
283 db: connection to db for out of memory logging of commands
281 NotImplemented
284 NotImplemented
282 engine_addrs: dict of zmq connection information for engines to connect
285 engine_info: dict of zmq connection information for engines to connect
283 to the queues.
286 to the queues.
284 client_addrs: dict of zmq connection information for engines to connect
287 client_info: dict of zmq connection information for engines to connect
285 to the queues.
288 to the queues.
286 """
289 """
287 # internal data structures:
290 # internal data structures:
288 ids=Set() # engine IDs
291 ids=Set() # engine IDs
289 keytable=Dict()
292 keytable=Dict()
290 by_ident=Dict()
293 by_ident=Dict()
291 engines=Dict()
294 engines=Dict()
292 clients=Dict()
295 clients=Dict()
293 hearts=Dict()
296 hearts=Dict()
294 pending=Set()
297 pending=Set()
295 queues=Dict() # pending msg_ids keyed by engine_id
298 queues=Dict() # pending msg_ids keyed by engine_id
296 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
299 tasks=Dict() # pending msg_ids submitted as tasks, keyed by client_id
297 completed=Dict() # completed msg_ids keyed by engine_id
300 completed=Dict() # completed msg_ids keyed by engine_id
298 all_completed=Set() # completed msg_ids keyed by engine_id
301 all_completed=Set() # completed msg_ids keyed by engine_id
299 # mia=None
302 # mia=None
300 incoming_registrations=Dict()
303 incoming_registrations=Dict()
301 registration_timeout=Int()
304 registration_timeout=Int()
302 _idcounter=Int(0)
305 _idcounter=Int(0)
303
306
304 # objects from constructor:
307 # objects from constructor:
305 loop=Instance(ioloop.IOLoop)
308 loop=Instance(ioloop.IOLoop)
306 registrar=Instance(ZMQStream)
309 registrar=Instance(ZMQStream)
307 clientele=Instance(ZMQStream)
310 clientele=Instance(ZMQStream)
308 monitor=Instance(ZMQStream)
311 monitor=Instance(ZMQStream)
309 heartmonitor=Instance(HeartMonitor)
312 heartmonitor=Instance(HeartMonitor)
310 notifier=Instance(ZMQStream)
313 notifier=Instance(ZMQStream)
311 db=Instance(object)
314 db=Instance(object)
312 client_addrs=Dict()
315 client_info=Dict()
313 engine_addrs=Dict()
316 engine_info=Dict()
314
317
315
318
316 def __init__(self, **kwargs):
319 def __init__(self, **kwargs):
317 """
320 """
318 # universal:
321 # universal:
319 loop: IOLoop for creating future connections
322 loop: IOLoop for creating future connections
320 session: streamsession for sending serialized data
323 session: streamsession for sending serialized data
321 # engine:
324 # engine:
322 queue: ZMQStream for monitoring queue messages
325 queue: ZMQStream for monitoring queue messages
323 registrar: ZMQStream for engine registration
326 registrar: ZMQStream for engine registration
324 heartbeat: HeartMonitor object for tracking engines
327 heartbeat: HeartMonitor object for tracking engines
325 # client:
328 # client:
326 clientele: ZMQStream for client connections
329 clientele: ZMQStream for client connections
327 # extra:
330 # extra:
328 db: ZMQStream for db connection (NotImplemented)
331 db: ZMQStream for db connection (NotImplemented)
329 engine_addrs: zmq address/protocol dict for engine connections
332 engine_info: zmq address/protocol dict for engine connections
330 client_addrs: zmq address/protocol dict for client connections
333 client_info: zmq address/protocol dict for client connections
331 """
334 """
332
335
333 super(Hub, self).__init__(**kwargs)
336 super(Hub, self).__init__(**kwargs)
334 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
337 self.registration_timeout = max(5000, 2*self.heartmonitor.period)
335
338
336 # validate connection dicts:
339 # validate connection dicts:
337 validate_url_container(self.client_addrs)
340 for k,v in self.client_info.iteritems():
338 validate_url_container(self.engine_addrs)
341 if k == 'task':
342 validate_url_container(v[1])
343 else:
344 validate_url_container(v)
345 # validate_url_container(self.client_info)
346 validate_url_container(self.engine_info)
339
347
340 # register our callbacks
348 # register our callbacks
341 self.registrar.on_recv(self.dispatch_register_request)
349 self.registrar.on_recv(self.dispatch_register_request)
342 self.clientele.on_recv(self.dispatch_client_msg)
350 self.clientele.on_recv(self.dispatch_client_msg)
343 self.monitor.on_recv(self.dispatch_monitor_traffic)
351 self.monitor.on_recv(self.dispatch_monitor_traffic)
344
352
345 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
353 self.heartmonitor.add_heart_failure_handler(self.handle_heart_failure)
346 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
354 self.heartmonitor.add_new_heart_handler(self.handle_new_heart)
347
355
348 self.monitor_handlers = { 'in' : self.save_queue_request,
356 self.monitor_handlers = { 'in' : self.save_queue_request,
349 'out': self.save_queue_result,
357 'out': self.save_queue_result,
350 'intask': self.save_task_request,
358 'intask': self.save_task_request,
351 'outtask': self.save_task_result,
359 'outtask': self.save_task_result,
352 'tracktask': self.save_task_destination,
360 'tracktask': self.save_task_destination,
353 'incontrol': _passer,
361 'incontrol': _passer,
354 'outcontrol': _passer,
362 'outcontrol': _passer,
355 'iopub': self.save_iopub_message,
363 'iopub': self.save_iopub_message,
356 }
364 }
357
365
358 self.client_handlers = {'queue_request': self.queue_status,
366 self.client_handlers = {'queue_request': self.queue_status,
359 'result_request': self.get_results,
367 'result_request': self.get_results,
360 'purge_request': self.purge_results,
368 'purge_request': self.purge_results,
361 'load_request': self.check_load,
369 'load_request': self.check_load,
362 'resubmit_request': self.resubmit_task,
370 'resubmit_request': self.resubmit_task,
363 'shutdown_request': self.shutdown_request,
371 'shutdown_request': self.shutdown_request,
364 }
372 }
365
373
366 self.registrar_handlers = {'registration_request' : self.register_engine,
374 self.registrar_handlers = {'registration_request' : self.register_engine,
367 'unregistration_request' : self.unregister_engine,
375 'unregistration_request' : self.unregister_engine,
368 'connection_request': self.connection_request,
376 'connection_request': self.connection_request,
369 }
377 }
370
378
371 self.log.info("hub::created hub")
379 self.log.info("hub::created hub")
372
380
373 @property
381 @property
374 def _next_id(self):
382 def _next_id(self):
375 """gemerate a new ID.
383 """gemerate a new ID.
376
384
377 No longer reuse old ids, just count from 0."""
385 No longer reuse old ids, just count from 0."""
378 newid = self._idcounter
386 newid = self._idcounter
379 self._idcounter += 1
387 self._idcounter += 1
380 return newid
388 return newid
381 # newid = 0
389 # newid = 0
382 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
390 # incoming = [id[0] for id in self.incoming_registrations.itervalues()]
383 # # print newid, self.ids, self.incoming_registrations
391 # # print newid, self.ids, self.incoming_registrations
384 # while newid in self.ids or newid in incoming:
392 # while newid in self.ids or newid in incoming:
385 # newid += 1
393 # newid += 1
386 # return newid
394 # return newid
387
395
388 #-----------------------------------------------------------------------------
396 #-----------------------------------------------------------------------------
389 # message validation
397 # message validation
390 #-----------------------------------------------------------------------------
398 #-----------------------------------------------------------------------------
391
399
392 def _validate_targets(self, targets):
400 def _validate_targets(self, targets):
393 """turn any valid targets argument into a list of integer ids"""
401 """turn any valid targets argument into a list of integer ids"""
394 if targets is None:
402 if targets is None:
395 # default to all
403 # default to all
396 targets = self.ids
404 targets = self.ids
397
405
398 if isinstance(targets, (int,str,unicode)):
406 if isinstance(targets, (int,str,unicode)):
399 # only one target specified
407 # only one target specified
400 targets = [targets]
408 targets = [targets]
401 _targets = []
409 _targets = []
402 for t in targets:
410 for t in targets:
403 # map raw identities to ids
411 # map raw identities to ids
404 if isinstance(t, (str,unicode)):
412 if isinstance(t, (str,unicode)):
405 t = self.by_ident.get(t, t)
413 t = self.by_ident.get(t, t)
406 _targets.append(t)
414 _targets.append(t)
407 targets = _targets
415 targets = _targets
408 bad_targets = [ t for t in targets if t not in self.ids ]
416 bad_targets = [ t for t in targets if t not in self.ids ]
409 if bad_targets:
417 if bad_targets:
410 raise IndexError("No Such Engine: %r"%bad_targets)
418 raise IndexError("No Such Engine: %r"%bad_targets)
411 if not targets:
419 if not targets:
412 raise IndexError("No Engines Registered")
420 raise IndexError("No Engines Registered")
413 return targets
421 return targets
414
422
415 def _validate_client_msg(self, msg):
423 def _validate_client_msg(self, msg):
416 """validates and unpacks headers of a message. Returns False if invalid,
424 """validates and unpacks headers of a message. Returns False if invalid,
417 (ident, header, parent, content)"""
425 (ident, header, parent, content)"""
418 client_id = msg[0]
426 client_id = msg[0]
419 try:
427 try:
420 msg = self.session.unpack_message(msg[1:], content=True)
428 msg = self.session.unpack_message(msg[1:], content=True)
421 except:
429 except:
422 self.log.error("client::Invalid Message %s"%msg, exc_info=True)
430 self.log.error("client::Invalid Message %s"%msg, exc_info=True)
423 return False
431 return False
424
432
425 msg_type = msg.get('msg_type', None)
433 msg_type = msg.get('msg_type', None)
426 if msg_type is None:
434 if msg_type is None:
427 return False
435 return False
428 header = msg.get('header')
436 header = msg.get('header')
429 # session doesn't handle split content for now:
437 # session doesn't handle split content for now:
430 return client_id, msg
438 return client_id, msg
431
439
432
440
433 #-----------------------------------------------------------------------------
441 #-----------------------------------------------------------------------------
434 # dispatch methods (1 per stream)
442 # dispatch methods (1 per stream)
435 #-----------------------------------------------------------------------------
443 #-----------------------------------------------------------------------------
436
444
437 def dispatch_register_request(self, msg):
445 def dispatch_register_request(self, msg):
438 """"""
446 """"""
439 self.log.debug("registration::dispatch_register_request(%s)"%msg)
447 self.log.debug("registration::dispatch_register_request(%s)"%msg)
440 idents,msg = self.session.feed_identities(msg)
448 idents,msg = self.session.feed_identities(msg)
441 if not idents:
449 if not idents:
442 self.log.error("Bad Queue Message: %s"%msg, exc_info=True)
450 self.log.error("Bad Queue Message: %s"%msg, exc_info=True)
443 return
451 return
444 try:
452 try:
445 msg = self.session.unpack_message(msg,content=True)
453 msg = self.session.unpack_message(msg,content=True)
446 except:
454 except:
447 self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
455 self.log.error("registration::got bad registration message: %s"%msg, exc_info=True)
448 return
456 return
449
457
450 msg_type = msg['msg_type']
458 msg_type = msg['msg_type']
451 content = msg['content']
459 content = msg['content']
452
460
453 handler = self.registrar_handlers.get(msg_type, None)
461 handler = self.registrar_handlers.get(msg_type, None)
454 if handler is None:
462 if handler is None:
455 self.log.error("registration::got bad registration message: %s"%msg)
463 self.log.error("registration::got bad registration message: %s"%msg)
456 else:
464 else:
457 handler(idents, msg)
465 handler(idents, msg)
458
466
459 def dispatch_monitor_traffic(self, msg):
467 def dispatch_monitor_traffic(self, msg):
460 """all ME and Task queue messages come through here, as well as
468 """all ME and Task queue messages come through here, as well as
461 IOPub traffic."""
469 IOPub traffic."""
462 self.log.debug("monitor traffic: %s"%msg[:2])
470 self.log.debug("monitor traffic: %s"%msg[:2])
463 switch = msg[0]
471 switch = msg[0]
464 idents, msg = self.session.feed_identities(msg[1:])
472 idents, msg = self.session.feed_identities(msg[1:])
465 if not idents:
473 if not idents:
466 self.log.error("Bad Monitor Message: %s"%msg)
474 self.log.error("Bad Monitor Message: %s"%msg)
467 return
475 return
468 handler = self.monitor_handlers.get(switch, None)
476 handler = self.monitor_handlers.get(switch, None)
469 if handler is not None:
477 if handler is not None:
470 handler(idents, msg)
478 handler(idents, msg)
471 else:
479 else:
472 self.log.error("Invalid monitor topic: %s"%switch)
480 self.log.error("Invalid monitor topic: %s"%switch)
473
481
474
482
475 def dispatch_client_msg(self, msg):
483 def dispatch_client_msg(self, msg):
476 """Route messages from clients"""
484 """Route messages from clients"""
477 idents, msg = self.session.feed_identities(msg)
485 idents, msg = self.session.feed_identities(msg)
478 if not idents:
486 if not idents:
479 self.log.error("Bad Client Message: %s"%msg)
487 self.log.error("Bad Client Message: %s"%msg)
480 return
488 return
481 client_id = idents[0]
489 client_id = idents[0]
482 try:
490 try:
483 msg = self.session.unpack_message(msg, content=True)
491 msg = self.session.unpack_message(msg, content=True)
484 except:
492 except:
485 content = wrap_exception()
493 content = wrap_exception()
486 self.log.error("Bad Client Message: %s"%msg, exc_info=True)
494 self.log.error("Bad Client Message: %s"%msg, exc_info=True)
487 self.session.send(self.clientele, "hub_error", ident=client_id,
495 self.session.send(self.clientele, "hub_error", ident=client_id,
488 content=content)
496 content=content)
489 return
497 return
490
498
491 # print client_id, header, parent, content
499 # print client_id, header, parent, content
492 #switch on message type:
500 #switch on message type:
493 msg_type = msg['msg_type']
501 msg_type = msg['msg_type']
494 self.log.info("client:: client %s requested %s"%(client_id, msg_type))
502 self.log.info("client:: client %s requested %s"%(client_id, msg_type))
495 handler = self.client_handlers.get(msg_type, None)
503 handler = self.client_handlers.get(msg_type, None)
496 try:
504 try:
497 assert handler is not None, "Bad Message Type: %s"%msg_type
505 assert handler is not None, "Bad Message Type: %s"%msg_type
498 except:
506 except:
499 content = wrap_exception()
507 content = wrap_exception()
500 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
508 self.log.error("Bad Message Type: %s"%msg_type, exc_info=True)
501 self.session.send(self.clientele, "hub_error", ident=client_id,
509 self.session.send(self.clientele, "hub_error", ident=client_id,
502 content=content)
510 content=content)
503 return
511 return
504 else:
512 else:
505 handler(client_id, msg)
513 handler(client_id, msg)
506
514
507 def dispatch_db(self, msg):
515 def dispatch_db(self, msg):
508 """"""
516 """"""
509 raise NotImplementedError
517 raise NotImplementedError
510
518
511 #---------------------------------------------------------------------------
519 #---------------------------------------------------------------------------
512 # handler methods (1 per event)
520 # handler methods (1 per event)
513 #---------------------------------------------------------------------------
521 #---------------------------------------------------------------------------
514
522
515 #----------------------- Heartbeat --------------------------------------
523 #----------------------- Heartbeat --------------------------------------
516
524
517 def handle_new_heart(self, heart):
525 def handle_new_heart(self, heart):
518 """handler to attach to heartbeater.
526 """handler to attach to heartbeater.
519 Called when a new heart starts to beat.
527 Called when a new heart starts to beat.
520 Triggers completion of registration."""
528 Triggers completion of registration."""
521 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
529 self.log.debug("heartbeat::handle_new_heart(%r)"%heart)
522 if heart not in self.incoming_registrations:
530 if heart not in self.incoming_registrations:
523 self.log.info("heartbeat::ignoring new heart: %r"%heart)
531 self.log.info("heartbeat::ignoring new heart: %r"%heart)
524 else:
532 else:
525 self.finish_registration(heart)
533 self.finish_registration(heart)
526
534
527
535
528 def handle_heart_failure(self, heart):
536 def handle_heart_failure(self, heart):
529 """handler to attach to heartbeater.
537 """handler to attach to heartbeater.
530 called when a previously registered heart fails to respond to beat request.
538 called when a previously registered heart fails to respond to beat request.
531 triggers unregistration"""
539 triggers unregistration"""
532 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
540 self.log.debug("heartbeat::handle_heart_failure(%r)"%heart)
533 eid = self.hearts.get(heart, None)
541 eid = self.hearts.get(heart, None)
534 queue = self.engines[eid].queue
542 queue = self.engines[eid].queue
535 if eid is None:
543 if eid is None:
536 self.log.info("heartbeat::ignoring heart failure %r"%heart)
544 self.log.info("heartbeat::ignoring heart failure %r"%heart)
537 else:
545 else:
538 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
546 self.unregister_engine(heart, dict(content=dict(id=eid, queue=queue)))
539
547
540 #----------------------- MUX Queue Traffic ------------------------------
548 #----------------------- MUX Queue Traffic ------------------------------
541
549
542 def save_queue_request(self, idents, msg):
550 def save_queue_request(self, idents, msg):
543 if len(idents) < 2:
551 if len(idents) < 2:
544 self.log.error("invalid identity prefix: %s"%idents)
552 self.log.error("invalid identity prefix: %s"%idents)
545 return
553 return
546 queue_id, client_id = idents[:2]
554 queue_id, client_id = idents[:2]
547 try:
555 try:
548 msg = self.session.unpack_message(msg, content=False)
556 msg = self.session.unpack_message(msg, content=False)
549 except:
557 except:
550 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
558 self.log.error("queue::client %r sent invalid message to %r: %s"%(client_id, queue_id, msg), exc_info=True)
551 return
559 return
552
560
553 eid = self.by_ident.get(queue_id, None)
561 eid = self.by_ident.get(queue_id, None)
554 if eid is None:
562 if eid is None:
555 self.log.error("queue::target %r not registered"%queue_id)
563 self.log.error("queue::target %r not registered"%queue_id)
556 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
564 self.log.debug("queue:: valid are: %s"%(self.by_ident.keys()))
557 return
565 return
558
566
559 header = msg['header']
567 header = msg['header']
560 msg_id = header['msg_id']
568 msg_id = header['msg_id']
561 record = init_record(msg)
569 record = init_record(msg)
562 record['engine_uuid'] = queue_id
570 record['engine_uuid'] = queue_id
563 record['client_uuid'] = client_id
571 record['client_uuid'] = client_id
564 record['queue'] = 'mux'
572 record['queue'] = 'mux'
565 if MongoDB is not None and isinstance(self.db, MongoDB):
573 if MongoDB is not None and isinstance(self.db, MongoDB):
566 record['buffers'] = map(Binary, record['buffers'])
574 record['buffers'] = map(Binary, record['buffers'])
567 self.pending.add(msg_id)
575 self.pending.add(msg_id)
568 self.queues[eid].append(msg_id)
576 self.queues[eid].append(msg_id)
569 self.db.add_record(msg_id, record)
577 self.db.add_record(msg_id, record)
570
578
571 def save_queue_result(self, idents, msg):
579 def save_queue_result(self, idents, msg):
572 if len(idents) < 2:
580 if len(idents) < 2:
573 self.log.error("invalid identity prefix: %s"%idents)
581 self.log.error("invalid identity prefix: %s"%idents)
574 return
582 return
575
583
576 client_id, queue_id = idents[:2]
584 client_id, queue_id = idents[:2]
577 try:
585 try:
578 msg = self.session.unpack_message(msg, content=False)
586 msg = self.session.unpack_message(msg, content=False)
579 except:
587 except:
580 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
588 self.log.error("queue::engine %r sent invalid message to %r: %s"%(
581 queue_id,client_id, msg), exc_info=True)
589 queue_id,client_id, msg), exc_info=True)
582 return
590 return
583
591
584 eid = self.by_ident.get(queue_id, None)
592 eid = self.by_ident.get(queue_id, None)
585 if eid is None:
593 if eid is None:
586 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
594 self.log.error("queue::unknown engine %r is sending a reply: "%queue_id)
587 self.log.debug("queue:: %s"%msg[2:])
595 self.log.debug("queue:: %s"%msg[2:])
588 return
596 return
589
597
590 parent = msg['parent_header']
598 parent = msg['parent_header']
591 if not parent:
599 if not parent:
592 return
600 return
593 msg_id = parent['msg_id']
601 msg_id = parent['msg_id']
594 if msg_id in self.pending:
602 if msg_id in self.pending:
595 self.pending.remove(msg_id)
603 self.pending.remove(msg_id)
596 self.all_completed.add(msg_id)
604 self.all_completed.add(msg_id)
597 self.queues[eid].remove(msg_id)
605 self.queues[eid].remove(msg_id)
598 self.completed[eid].append(msg_id)
606 self.completed[eid].append(msg_id)
599 rheader = msg['header']
607 rheader = msg['header']
600 completed = datetime.strptime(rheader['date'], ISO8601)
608 completed = datetime.strptime(rheader['date'], ISO8601)
601 started = rheader.get('started', None)
609 started = rheader.get('started', None)
602 if started is not None:
610 if started is not None:
603 started = datetime.strptime(started, ISO8601)
611 started = datetime.strptime(started, ISO8601)
604 result = {
612 result = {
605 'result_header' : rheader,
613 'result_header' : rheader,
606 'result_content': msg['content'],
614 'result_content': msg['content'],
607 'started' : started,
615 'started' : started,
608 'completed' : completed
616 'completed' : completed
609 }
617 }
610 if MongoDB is not None and isinstance(self.db, MongoDB):
618 if MongoDB is not None and isinstance(self.db, MongoDB):
611 result['result_buffers'] = map(Binary, msg['buffers'])
619 result['result_buffers'] = map(Binary, msg['buffers'])
612 else:
620 else:
613 result['result_buffers'] = msg['buffers']
621 result['result_buffers'] = msg['buffers']
614 self.db.update_record(msg_id, result)
622 self.db.update_record(msg_id, result)
615 else:
623 else:
616 self.log.debug("queue:: unknown msg finished %s"%msg_id)
624 self.log.debug("queue:: unknown msg finished %s"%msg_id)
617
625
618 #--------------------- Task Queue Traffic ------------------------------
626 #--------------------- Task Queue Traffic ------------------------------
619
627
620 def save_task_request(self, idents, msg):
628 def save_task_request(self, idents, msg):
621 """Save the submission of a task."""
629 """Save the submission of a task."""
622 client_id = idents[0]
630 client_id = idents[0]
623
631
624 try:
632 try:
625 msg = self.session.unpack_message(msg, content=False)
633 msg = self.session.unpack_message(msg, content=False)
626 except:
634 except:
627 self.log.error("task::client %r sent invalid task message: %s"%(
635 self.log.error("task::client %r sent invalid task message: %s"%(
628 client_id, msg), exc_info=True)
636 client_id, msg), exc_info=True)
629 return
637 return
630 record = init_record(msg)
638 record = init_record(msg)
631 if MongoDB is not None and isinstance(self.db, MongoDB):
639 if MongoDB is not None and isinstance(self.db, MongoDB):
632 record['buffers'] = map(Binary, record['buffers'])
640 record['buffers'] = map(Binary, record['buffers'])
633 record['client_uuid'] = client_id
641 record['client_uuid'] = client_id
634 record['queue'] = 'task'
642 record['queue'] = 'task'
635 header = msg['header']
643 header = msg['header']
636 msg_id = header['msg_id']
644 msg_id = header['msg_id']
637 self.pending.add(msg_id)
645 self.pending.add(msg_id)
638 self.db.add_record(msg_id, record)
646 self.db.add_record(msg_id, record)
639
647
640 def save_task_result(self, idents, msg):
648 def save_task_result(self, idents, msg):
641 """save the result of a completed task."""
649 """save the result of a completed task."""
642 client_id = idents[0]
650 client_id = idents[0]
643 try:
651 try:
644 msg = self.session.unpack_message(msg, content=False)
652 msg = self.session.unpack_message(msg, content=False)
645 except:
653 except:
646 self.log.error("task::invalid task result message send to %r: %s"%(
654 self.log.error("task::invalid task result message send to %r: %s"%(
647 client_id, msg), exc_info=True)
655 client_id, msg), exc_info=True)
648 raise
656 raise
649 return
657 return
650
658
651 parent = msg['parent_header']
659 parent = msg['parent_header']
652 if not parent:
660 if not parent:
653 # print msg
661 # print msg
654 self.log.warn("Task %r had no parent!"%msg)
662 self.log.warn("Task %r had no parent!"%msg)
655 return
663 return
656 msg_id = parent['msg_id']
664 msg_id = parent['msg_id']
657
665
658 header = msg['header']
666 header = msg['header']
659 engine_uuid = header.get('engine', None)
667 engine_uuid = header.get('engine', None)
660 eid = self.by_ident.get(engine_uuid, None)
668 eid = self.by_ident.get(engine_uuid, None)
661
669
662 if msg_id in self.pending:
670 if msg_id in self.pending:
663 self.pending.remove(msg_id)
671 self.pending.remove(msg_id)
664 self.all_completed.add(msg_id)
672 self.all_completed.add(msg_id)
665 if eid is not None:
673 if eid is not None:
666 self.completed[eid].append(msg_id)
674 self.completed[eid].append(msg_id)
667 if msg_id in self.tasks[eid]:
675 if msg_id in self.tasks[eid]:
668 self.tasks[eid].remove(msg_id)
676 self.tasks[eid].remove(msg_id)
669 completed = datetime.strptime(header['date'], ISO8601)
677 completed = datetime.strptime(header['date'], ISO8601)
670 started = header.get('started', None)
678 started = header.get('started', None)
671 if started is not None:
679 if started is not None:
672 started = datetime.strptime(started, ISO8601)
680 started = datetime.strptime(started, ISO8601)
673 result = {
681 result = {
674 'result_header' : header,
682 'result_header' : header,
675 'result_content': msg['content'],
683 'result_content': msg['content'],
676 'started' : started,
684 'started' : started,
677 'completed' : completed,
685 'completed' : completed,
678 'engine_uuid': engine_uuid
686 'engine_uuid': engine_uuid
679 }
687 }
680 if MongoDB is not None and isinstance(self.db, MongoDB):
688 if MongoDB is not None and isinstance(self.db, MongoDB):
681 result['result_buffers'] = map(Binary, msg['buffers'])
689 result['result_buffers'] = map(Binary, msg['buffers'])
682 else:
690 else:
683 result['result_buffers'] = msg['buffers']
691 result['result_buffers'] = msg['buffers']
684 self.db.update_record(msg_id, result)
692 self.db.update_record(msg_id, result)
685
693
686 else:
694 else:
687 self.log.debug("task::unknown task %s finished"%msg_id)
695 self.log.debug("task::unknown task %s finished"%msg_id)
688
696
689 def save_task_destination(self, idents, msg):
697 def save_task_destination(self, idents, msg):
690 try:
698 try:
691 msg = self.session.unpack_message(msg, content=True)
699 msg = self.session.unpack_message(msg, content=True)
692 except:
700 except:
693 self.log.error("task::invalid task tracking message", exc_info=True)
701 self.log.error("task::invalid task tracking message", exc_info=True)
694 return
702 return
695 content = msg['content']
703 content = msg['content']
696 print (content)
704 print (content)
697 msg_id = content['msg_id']
705 msg_id = content['msg_id']
698 engine_uuid = content['engine_id']
706 engine_uuid = content['engine_id']
699 eid = self.by_ident[engine_uuid]
707 eid = self.by_ident[engine_uuid]
700
708
701 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
709 self.log.info("task::task %s arrived on %s"%(msg_id, eid))
702 # if msg_id in self.mia:
710 # if msg_id in self.mia:
703 # self.mia.remove(msg_id)
711 # self.mia.remove(msg_id)
704 # else:
712 # else:
705 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
713 # self.log.debug("task::task %s not listed as MIA?!"%(msg_id))
706
714
707 self.tasks[eid].append(msg_id)
715 self.tasks[eid].append(msg_id)
708 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
716 # self.pending[msg_id][1].update(received=datetime.now(),engine=(eid,engine_uuid))
709 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
717 self.db.update_record(msg_id, dict(engine_uuid=engine_uuid))
710
718
711 def mia_task_request(self, idents, msg):
719 def mia_task_request(self, idents, msg):
712 raise NotImplementedError
720 raise NotImplementedError
713 client_id = idents[0]
721 client_id = idents[0]
714 # content = dict(mia=self.mia,status='ok')
722 # content = dict(mia=self.mia,status='ok')
715 # self.session.send('mia_reply', content=content, idents=client_id)
723 # self.session.send('mia_reply', content=content, idents=client_id)
716
724
717
725
718 #--------------------- IOPub Traffic ------------------------------
726 #--------------------- IOPub Traffic ------------------------------
719
727
720 def save_iopub_message(self, topics, msg):
728 def save_iopub_message(self, topics, msg):
721 """save an iopub message into the db"""
729 """save an iopub message into the db"""
722 print (topics)
730 print (topics)
723 try:
731 try:
724 msg = self.session.unpack_message(msg, content=True)
732 msg = self.session.unpack_message(msg, content=True)
725 except:
733 except:
726 self.log.error("iopub::invalid IOPub message", exc_info=True)
734 self.log.error("iopub::invalid IOPub message", exc_info=True)
727 return
735 return
728
736
729 parent = msg['parent_header']
737 parent = msg['parent_header']
730 if not parent:
738 if not parent:
731 self.log.error("iopub::invalid IOPub message: %s"%msg)
739 self.log.error("iopub::invalid IOPub message: %s"%msg)
732 return
740 return
733 msg_id = parent['msg_id']
741 msg_id = parent['msg_id']
734 msg_type = msg['msg_type']
742 msg_type = msg['msg_type']
735 content = msg['content']
743 content = msg['content']
736
744
737 # ensure msg_id is in db
745 # ensure msg_id is in db
738 try:
746 try:
739 rec = self.db.get_record(msg_id)
747 rec = self.db.get_record(msg_id)
740 except:
748 except:
741 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
749 self.log.error("iopub::IOPub message has invalid parent", exc_info=True)
742 return
750 return
743 # stream
751 # stream
744 d = {}
752 d = {}
745 if msg_type == 'stream':
753 if msg_type == 'stream':
746 name = content['name']
754 name = content['name']
747 s = rec[name] or ''
755 s = rec[name] or ''
748 d[name] = s + content['data']
756 d[name] = s + content['data']
749
757
750 elif msg_type == 'pyerr':
758 elif msg_type == 'pyerr':
751 d['pyerr'] = content
759 d['pyerr'] = content
752 else:
760 else:
753 d[msg_type] = content['data']
761 d[msg_type] = content['data']
754
762
755 self.db.update_record(msg_id, d)
763 self.db.update_record(msg_id, d)
756
764
757
765
758
766
759 #-------------------------------------------------------------------------
767 #-------------------------------------------------------------------------
760 # Registration requests
768 # Registration requests
761 #-------------------------------------------------------------------------
769 #-------------------------------------------------------------------------
762
770
763 def connection_request(self, client_id, msg):
771 def connection_request(self, client_id, msg):
764 """Reply with connection addresses for clients."""
772 """Reply with connection addresses for clients."""
765 self.log.info("client::client %s connected"%client_id)
773 self.log.info("client::client %s connected"%client_id)
766 content = dict(status='ok')
774 content = dict(status='ok')
767 content.update(self.client_addrs)
775 content.update(self.client_info)
768 jsonable = {}
776 jsonable = {}
769 for k,v in self.keytable.iteritems():
777 for k,v in self.keytable.iteritems():
770 jsonable[str(k)] = v
778 jsonable[str(k)] = v
771 content['engines'] = jsonable
779 content['engines'] = jsonable
772 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
780 self.session.send(self.registrar, 'connection_reply', content, parent=msg, ident=client_id)
773
781
774 def register_engine(self, reg, msg):
782 def register_engine(self, reg, msg):
775 """Register a new engine."""
783 """Register a new engine."""
776 content = msg['content']
784 content = msg['content']
777 try:
785 try:
778 queue = content['queue']
786 queue = content['queue']
779 except KeyError:
787 except KeyError:
780 self.log.error("registration::queue not specified", exc_info=True)
788 self.log.error("registration::queue not specified", exc_info=True)
781 return
789 return
782 heart = content.get('heartbeat', None)
790 heart = content.get('heartbeat', None)
783 """register a new engine, and create the socket(s) necessary"""
791 """register a new engine, and create the socket(s) necessary"""
784 eid = self._next_id
792 eid = self._next_id
785 # print (eid, queue, reg, heart)
793 # print (eid, queue, reg, heart)
786
794
787 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
795 self.log.debug("registration::register_engine(%i, %r, %r, %r)"%(eid, queue, reg, heart))
788
796
789 content = dict(id=eid,status='ok')
797 content = dict(id=eid,status='ok')
790 content.update(self.engine_addrs)
798 content.update(self.engine_info)
791 # check if requesting available IDs:
799 # check if requesting available IDs:
792 if queue in self.by_ident:
800 if queue in self.by_ident:
793 try:
801 try:
794 raise KeyError("queue_id %r in use"%queue)
802 raise KeyError("queue_id %r in use"%queue)
795 except:
803 except:
796 content = wrap_exception()
804 content = wrap_exception()
797 self.log.error("queue_id %r in use"%queue, exc_info=True)
805 self.log.error("queue_id %r in use"%queue, exc_info=True)
798 elif heart in self.hearts: # need to check unique hearts?
806 elif heart in self.hearts: # need to check unique hearts?
799 try:
807 try:
800 raise KeyError("heart_id %r in use"%heart)
808 raise KeyError("heart_id %r in use"%heart)
801 except:
809 except:
802 self.log.error("heart_id %r in use"%heart, exc_info=True)
810 self.log.error("heart_id %r in use"%heart, exc_info=True)
803 content = wrap_exception()
811 content = wrap_exception()
804 else:
812 else:
805 for h, pack in self.incoming_registrations.iteritems():
813 for h, pack in self.incoming_registrations.iteritems():
806 if heart == h:
814 if heart == h:
807 try:
815 try:
808 raise KeyError("heart_id %r in use"%heart)
816 raise KeyError("heart_id %r in use"%heart)
809 except:
817 except:
810 self.log.error("heart_id %r in use"%heart, exc_info=True)
818 self.log.error("heart_id %r in use"%heart, exc_info=True)
811 content = wrap_exception()
819 content = wrap_exception()
812 break
820 break
813 elif queue == pack[1]:
821 elif queue == pack[1]:
814 try:
822 try:
815 raise KeyError("queue_id %r in use"%queue)
823 raise KeyError("queue_id %r in use"%queue)
816 except:
824 except:
817 self.log.error("queue_id %r in use"%queue, exc_info=True)
825 self.log.error("queue_id %r in use"%queue, exc_info=True)
818 content = wrap_exception()
826 content = wrap_exception()
819 break
827 break
820
828
821 msg = self.session.send(self.registrar, "registration_reply",
829 msg = self.session.send(self.registrar, "registration_reply",
822 content=content,
830 content=content,
823 ident=reg)
831 ident=reg)
824
832
825 if content['status'] == 'ok':
833 if content['status'] == 'ok':
826 if heart in self.heartmonitor.hearts:
834 if heart in self.heartmonitor.hearts:
827 # already beating
835 # already beating
828 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
836 self.incoming_registrations[heart] = (eid,queue,reg[0],None)
829 self.finish_registration(heart)
837 self.finish_registration(heart)
830 else:
838 else:
831 purge = lambda : self._purge_stalled_registration(heart)
839 purge = lambda : self._purge_stalled_registration(heart)
832 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
840 dc = ioloop.DelayedCallback(purge, self.registration_timeout, self.loop)
833 dc.start()
841 dc.start()
834 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
842 self.incoming_registrations[heart] = (eid,queue,reg[0],dc)
835 else:
843 else:
836 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
844 self.log.error("registration::registration %i failed: %s"%(eid, content['evalue']))
837 return eid
845 return eid
838
846
839 def unregister_engine(self, ident, msg):
847 def unregister_engine(self, ident, msg):
840 """Unregister an engine that explicitly requested to leave."""
848 """Unregister an engine that explicitly requested to leave."""
841 try:
849 try:
842 eid = msg['content']['id']
850 eid = msg['content']['id']
843 except:
851 except:
844 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
852 self.log.error("registration::bad engine id for unregistration: %s"%ident, exc_info=True)
845 return
853 return
846 self.log.info("registration::unregister_engine(%s)"%eid)
854 self.log.info("registration::unregister_engine(%s)"%eid)
847 content=dict(id=eid, queue=self.engines[eid].queue)
855 content=dict(id=eid, queue=self.engines[eid].queue)
848 self.ids.remove(eid)
856 self.ids.remove(eid)
849 self.keytable.pop(eid)
857 self.keytable.pop(eid)
850 ec = self.engines.pop(eid)
858 ec = self.engines.pop(eid)
851 self.hearts.pop(ec.heartbeat)
859 self.hearts.pop(ec.heartbeat)
852 self.by_ident.pop(ec.queue)
860 self.by_ident.pop(ec.queue)
853 self.completed.pop(eid)
861 self.completed.pop(eid)
854 for msg_id in self.queues.pop(eid):
862 for msg_id in self.queues.pop(eid):
855 msg = self.pending.remove(msg_id)
863 msg = self.pending.remove(msg_id)
856 ############## TODO: HANDLE IT ################
864 ############## TODO: HANDLE IT ################
857
865
858 if self.notifier:
866 if self.notifier:
859 self.session.send(self.notifier, "unregistration_notification", content=content)
867 self.session.send(self.notifier, "unregistration_notification", content=content)
860
868
861 def finish_registration(self, heart):
869 def finish_registration(self, heart):
862 """Second half of engine registration, called after our HeartMonitor
870 """Second half of engine registration, called after our HeartMonitor
863 has received a beat from the Engine's Heart."""
871 has received a beat from the Engine's Heart."""
864 try:
872 try:
865 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
873 (eid,queue,reg,purge) = self.incoming_registrations.pop(heart)
866 except KeyError:
874 except KeyError:
867 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
875 self.log.error("registration::tried to finish nonexistant registration", exc_info=True)
868 return
876 return
869 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
877 self.log.info("registration::finished registering engine %i:%r"%(eid,queue))
870 if purge is not None:
878 if purge is not None:
871 purge.stop()
879 purge.stop()
872 control = queue
880 control = queue
873 self.ids.add(eid)
881 self.ids.add(eid)
874 self.keytable[eid] = queue
882 self.keytable[eid] = queue
875 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
883 self.engines[eid] = EngineConnector(id=eid, queue=queue, registration=reg,
876 control=control, heartbeat=heart)
884 control=control, heartbeat=heart)
877 self.by_ident[queue] = eid
885 self.by_ident[queue] = eid
878 self.queues[eid] = list()
886 self.queues[eid] = list()
879 self.tasks[eid] = list()
887 self.tasks[eid] = list()
880 self.completed[eid] = list()
888 self.completed[eid] = list()
881 self.hearts[heart] = eid
889 self.hearts[heart] = eid
882 content = dict(id=eid, queue=self.engines[eid].queue)
890 content = dict(id=eid, queue=self.engines[eid].queue)
883 if self.notifier:
891 if self.notifier:
884 self.session.send(self.notifier, "registration_notification", content=content)
892 self.session.send(self.notifier, "registration_notification", content=content)
885 self.log.info("engine::Engine Connected: %i"%eid)
893 self.log.info("engine::Engine Connected: %i"%eid)
886
894
887 def _purge_stalled_registration(self, heart):
895 def _purge_stalled_registration(self, heart):
888 if heart in self.incoming_registrations:
896 if heart in self.incoming_registrations:
889 eid = self.incoming_registrations.pop(heart)[0]
897 eid = self.incoming_registrations.pop(heart)[0]
890 self.log.info("registration::purging stalled registration: %i"%eid)
898 self.log.info("registration::purging stalled registration: %i"%eid)
891 else:
899 else:
892 pass
900 pass
893
901
894 #-------------------------------------------------------------------------
902 #-------------------------------------------------------------------------
895 # Client Requests
903 # Client Requests
896 #-------------------------------------------------------------------------
904 #-------------------------------------------------------------------------
897
905
898 def shutdown_request(self, client_id, msg):
906 def shutdown_request(self, client_id, msg):
899 """handle shutdown request."""
907 """handle shutdown request."""
900 # s = self.context.socket(zmq.XREQ)
908 # s = self.context.socket(zmq.XREQ)
901 # s.connect(self.client_connections['mux'])
909 # s.connect(self.client_connections['mux'])
902 # time.sleep(0.1)
910 # time.sleep(0.1)
903 # for eid,ec in self.engines.iteritems():
911 # for eid,ec in self.engines.iteritems():
904 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
912 # self.session.send(s, 'shutdown_request', content=dict(restart=False), ident=ec.queue)
905 # time.sleep(1)
913 # time.sleep(1)
906 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
914 self.session.send(self.clientele, 'shutdown_reply', content={'status': 'ok'}, ident=client_id)
907 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
915 dc = ioloop.DelayedCallback(lambda : self._shutdown(), 1000, self.loop)
908 dc.start()
916 dc.start()
909
917
910 def _shutdown(self):
918 def _shutdown(self):
911 self.log.info("hub::hub shutting down.")
919 self.log.info("hub::hub shutting down.")
912 time.sleep(0.1)
920 time.sleep(0.1)
913 sys.exit(0)
921 sys.exit(0)
914
922
915
923
916 def check_load(self, client_id, msg):
924 def check_load(self, client_id, msg):
917 content = msg['content']
925 content = msg['content']
918 try:
926 try:
919 targets = content['targets']
927 targets = content['targets']
920 targets = self._validate_targets(targets)
928 targets = self._validate_targets(targets)
921 except:
929 except:
922 content = wrap_exception()
930 content = wrap_exception()
923 self.session.send(self.clientele, "hub_error",
931 self.session.send(self.clientele, "hub_error",
924 content=content, ident=client_id)
932 content=content, ident=client_id)
925 return
933 return
926
934
927 content = dict(status='ok')
935 content = dict(status='ok')
928 # loads = {}
936 # loads = {}
929 for t in targets:
937 for t in targets:
930 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
938 content[bytes(t)] = len(self.queues[t])+len(self.tasks[t])
931 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
939 self.session.send(self.clientele, "load_reply", content=content, ident=client_id)
932
940
933
941
934 def queue_status(self, client_id, msg):
942 def queue_status(self, client_id, msg):
935 """Return the Queue status of one or more targets.
943 """Return the Queue status of one or more targets.
936 if verbose: return the msg_ids
944 if verbose: return the msg_ids
937 else: return len of each type.
945 else: return len of each type.
938 keys: queue (pending MUX jobs)
946 keys: queue (pending MUX jobs)
939 tasks (pending Task jobs)
947 tasks (pending Task jobs)
940 completed (finished jobs from both queues)"""
948 completed (finished jobs from both queues)"""
941 content = msg['content']
949 content = msg['content']
942 targets = content['targets']
950 targets = content['targets']
943 try:
951 try:
944 targets = self._validate_targets(targets)
952 targets = self._validate_targets(targets)
945 except:
953 except:
946 content = wrap_exception()
954 content = wrap_exception()
947 self.session.send(self.clientele, "hub_error",
955 self.session.send(self.clientele, "hub_error",
948 content=content, ident=client_id)
956 content=content, ident=client_id)
949 return
957 return
950 verbose = content.get('verbose', False)
958 verbose = content.get('verbose', False)
951 content = dict(status='ok')
959 content = dict(status='ok')
952 for t in targets:
960 for t in targets:
953 queue = self.queues[t]
961 queue = self.queues[t]
954 completed = self.completed[t]
962 completed = self.completed[t]
955 tasks = self.tasks[t]
963 tasks = self.tasks[t]
956 if not verbose:
964 if not verbose:
957 queue = len(queue)
965 queue = len(queue)
958 completed = len(completed)
966 completed = len(completed)
959 tasks = len(tasks)
967 tasks = len(tasks)
960 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
968 content[bytes(t)] = {'queue': queue, 'completed': completed , 'tasks': tasks}
961 # pending
969 # pending
962 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
970 self.session.send(self.clientele, "queue_reply", content=content, ident=client_id)
963
971
964 def purge_results(self, client_id, msg):
972 def purge_results(self, client_id, msg):
965 """Purge results from memory. This method is more valuable before we move
973 """Purge results from memory. This method is more valuable before we move
966 to a DB based message storage mechanism."""
974 to a DB based message storage mechanism."""
967 content = msg['content']
975 content = msg['content']
968 msg_ids = content.get('msg_ids', [])
976 msg_ids = content.get('msg_ids', [])
969 reply = dict(status='ok')
977 reply = dict(status='ok')
970 if msg_ids == 'all':
978 if msg_ids == 'all':
971 self.db.drop_matching_records(dict(completed={'$ne':None}))
979 self.db.drop_matching_records(dict(completed={'$ne':None}))
972 else:
980 else:
973 for msg_id in msg_ids:
981 for msg_id in msg_ids:
974 if msg_id in self.all_completed:
982 if msg_id in self.all_completed:
975 self.db.drop_record(msg_id)
983 self.db.drop_record(msg_id)
976 else:
984 else:
977 if msg_id in self.pending:
985 if msg_id in self.pending:
978 try:
986 try:
979 raise IndexError("msg pending: %r"%msg_id)
987 raise IndexError("msg pending: %r"%msg_id)
980 except:
988 except:
981 reply = wrap_exception()
989 reply = wrap_exception()
982 else:
990 else:
983 try:
991 try:
984 raise IndexError("No such msg: %r"%msg_id)
992 raise IndexError("No such msg: %r"%msg_id)
985 except:
993 except:
986 reply = wrap_exception()
994 reply = wrap_exception()
987 break
995 break
988 eids = content.get('engine_ids', [])
996 eids = content.get('engine_ids', [])
989 for eid in eids:
997 for eid in eids:
990 if eid not in self.engines:
998 if eid not in self.engines:
991 try:
999 try:
992 raise IndexError("No such engine: %i"%eid)
1000 raise IndexError("No such engine: %i"%eid)
993 except:
1001 except:
994 reply = wrap_exception()
1002 reply = wrap_exception()
995 break
1003 break
996 msg_ids = self.completed.pop(eid)
1004 msg_ids = self.completed.pop(eid)
997 uid = self.engines[eid].queue
1005 uid = self.engines[eid].queue
998 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
1006 self.db.drop_matching_records(dict(engine_uuid=uid, completed={'$ne':None}))
999
1007
1000 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
1008 self.session.send(self.clientele, 'purge_reply', content=reply, ident=client_id)
1001
1009
1002 def resubmit_task(self, client_id, msg, buffers):
1010 def resubmit_task(self, client_id, msg, buffers):
1003 """Resubmit a task."""
1011 """Resubmit a task."""
1004 raise NotImplementedError
1012 raise NotImplementedError
1005
1013
1006 def get_results(self, client_id, msg):
1014 def get_results(self, client_id, msg):
1007 """Get the result of 1 or more messages."""
1015 """Get the result of 1 or more messages."""
1008 content = msg['content']
1016 content = msg['content']
1009 msg_ids = sorted(set(content['msg_ids']))
1017 msg_ids = sorted(set(content['msg_ids']))
1010 statusonly = content.get('status_only', False)
1018 statusonly = content.get('status_only', False)
1011 pending = []
1019 pending = []
1012 completed = []
1020 completed = []
1013 content = dict(status='ok')
1021 content = dict(status='ok')
1014 content['pending'] = pending
1022 content['pending'] = pending
1015 content['completed'] = completed
1023 content['completed'] = completed
1016 buffers = []
1024 buffers = []
1017 if not statusonly:
1025 if not statusonly:
1018 content['results'] = {}
1026 content['results'] = {}
1019 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1027 records = self.db.find_records(dict(msg_id={'$in':msg_ids}))
1020 for msg_id in msg_ids:
1028 for msg_id in msg_ids:
1021 if msg_id in self.pending:
1029 if msg_id in self.pending:
1022 pending.append(msg_id)
1030 pending.append(msg_id)
1023 elif msg_id in self.all_completed:
1031 elif msg_id in self.all_completed:
1024 completed.append(msg_id)
1032 completed.append(msg_id)
1025 if not statusonly:
1033 if not statusonly:
1026 rec = records[msg_id]
1034 rec = records[msg_id]
1027 io_dict = {}
1035 io_dict = {}
1028 for key in 'pyin pyout pyerr stdout stderr'.split():
1036 for key in 'pyin pyout pyerr stdout stderr'.split():
1029 io_dict[key] = rec[key]
1037 io_dict[key] = rec[key]
1030 content[msg_id] = { 'result_content': rec['result_content'],
1038 content[msg_id] = { 'result_content': rec['result_content'],
1031 'header': rec['header'],
1039 'header': rec['header'],
1032 'result_header' : rec['result_header'],
1040 'result_header' : rec['result_header'],
1033 'io' : io_dict,
1041 'io' : io_dict,
1034 }
1042 }
1035 buffers.extend(map(str, rec['result_buffers']))
1043 buffers.extend(map(str, rec['result_buffers']))
1036 else:
1044 else:
1037 try:
1045 try:
1038 raise KeyError('No such message: '+msg_id)
1046 raise KeyError('No such message: '+msg_id)
1039 except:
1047 except:
1040 content = wrap_exception()
1048 content = wrap_exception()
1041 break
1049 break
1042 self.session.send(self.clientele, "result_reply", content=content,
1050 self.session.send(self.clientele, "result_reply", content=content,
1043 parent=msg, ident=client_id,
1051 parent=msg, ident=client_id,
1044 buffers=buffers)
1052 buffers=buffers)
1045
1053
@@ -1,390 +1,390 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The IPython controller application.
4 The IPython controller application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import copy
20 import copy
21 import sys
21 import sys
22 import os
22 import os
23 import logging
23 import logging
24 import stat
24 import stat
25 import socket
25 import socket
26
26
27 import uuid
27 import uuid
28
28
29 import zmq
29 import zmq
30 from zmq.log.handlers import PUBHandler
30 from zmq.log.handlers import PUBHandler
31 from zmq.utils import jsonapi as json
31 from zmq.utils import jsonapi as json
32
32
33 from IPython.config.loader import Config
33 from IPython.config.loader import Config
34 from IPython.zmq.parallel import factory
34 from IPython.zmq.parallel import factory
35 from IPython.zmq.parallel.controller import ControllerFactory
35 from IPython.zmq.parallel.controller import ControllerFactory
36 from IPython.zmq.parallel.clusterdir import (
36 from IPython.zmq.parallel.clusterdir import (
37 ApplicationWithClusterDir,
37 ApplicationWithClusterDir,
38 ClusterDirConfigLoader
38 ClusterDirConfigLoader
39 )
39 )
40 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
40 # from IPython.kernel.fcutil import FCServiceFactory, FURLError
41 from IPython.utils.traitlets import Instance, Unicode
41 from IPython.utils.traitlets import Instance, Unicode
42
42
43 from util import disambiguate_ip_address, split_url
43 from util import disambiguate_ip_address, split_url
44
44
45
45
46 #-----------------------------------------------------------------------------
46 #-----------------------------------------------------------------------------
47 # Module level variables
47 # Module level variables
48 #-----------------------------------------------------------------------------
48 #-----------------------------------------------------------------------------
49
49
50
50
51 #: The default config file name for this application
51 #: The default config file name for this application
52 default_config_file_name = u'ipcontroller_config.py'
52 default_config_file_name = u'ipcontroller_config.py'
53
53
54
54
55 _description = """Start the IPython controller for parallel computing.
55 _description = """Start the IPython controller for parallel computing.
56
56
57 The IPython controller provides a gateway between the IPython engines and
57 The IPython controller provides a gateway between the IPython engines and
58 clients. The controller needs to be started before the engines and can be
58 clients. The controller needs to be started before the engines and can be
59 configured using command line options or using a cluster directory. Cluster
59 configured using command line options or using a cluster directory. Cluster
60 directories contain config, log and security files and are usually located in
60 directories contain config, log and security files and are usually located in
61 your .ipython directory and named as "cluster_<profile>". See the --profile
61 your .ipython directory and named as "cluster_<profile>". See the --profile
62 and --cluster-dir options for details.
62 and --cluster-dir options for details.
63 """
63 """
64
64
65 #-----------------------------------------------------------------------------
65 #-----------------------------------------------------------------------------
66 # Default interfaces
66 # Default interfaces
67 #-----------------------------------------------------------------------------
67 #-----------------------------------------------------------------------------
68
68
69 # The default client interfaces for FCClientServiceFactory.interfaces
69 # The default client interfaces for FCClientServiceFactory.interfaces
70 default_client_interfaces = Config()
70 default_client_interfaces = Config()
71 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
71 default_client_interfaces.Default.url_file = 'ipcontroller-client.url'
72
72
73 # Make this a dict we can pass to Config.__init__ for the default
73 # Make this a dict we can pass to Config.__init__ for the default
74 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
74 default_client_interfaces = dict(copy.deepcopy(default_client_interfaces.items()))
75
75
76
76
77
77
78 # The default engine interfaces for FCEngineServiceFactory.interfaces
78 # The default engine interfaces for FCEngineServiceFactory.interfaces
79 default_engine_interfaces = Config()
79 default_engine_interfaces = Config()
80 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
80 default_engine_interfaces.Default.url_file = u'ipcontroller-engine.url'
81
81
82 # Make this a dict we can pass to Config.__init__ for the default
82 # Make this a dict we can pass to Config.__init__ for the default
83 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
83 default_engine_interfaces = dict(copy.deepcopy(default_engine_interfaces.items()))
84
84
85
85
86 #-----------------------------------------------------------------------------
86 #-----------------------------------------------------------------------------
87 # Service factories
87 # Service factories
88 #-----------------------------------------------------------------------------
88 #-----------------------------------------------------------------------------
89
89
90 #
90 #
91 # class FCClientServiceFactory(FCServiceFactory):
91 # class FCClientServiceFactory(FCServiceFactory):
92 # """A Foolscap implementation of the client services."""
92 # """A Foolscap implementation of the client services."""
93 #
93 #
94 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
94 # cert_file = Unicode(u'ipcontroller-client.pem', config=True)
95 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
95 # interfaces = Instance(klass=Config, kw=default_client_interfaces,
96 # allow_none=False, config=True)
96 # allow_none=False, config=True)
97 #
97 #
98 #
98 #
99 # class FCEngineServiceFactory(FCServiceFactory):
99 # class FCEngineServiceFactory(FCServiceFactory):
100 # """A Foolscap implementation of the engine services."""
100 # """A Foolscap implementation of the engine services."""
101 #
101 #
102 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
102 # cert_file = Unicode(u'ipcontroller-engine.pem', config=True)
103 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
103 # interfaces = Instance(klass=dict, kw=default_engine_interfaces,
104 # allow_none=False, config=True)
104 # allow_none=False, config=True)
105 #
105 #
106
106
107 #-----------------------------------------------------------------------------
107 #-----------------------------------------------------------------------------
108 # Command line options
108 # Command line options
109 #-----------------------------------------------------------------------------
109 #-----------------------------------------------------------------------------
110
110
111
111
112 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
112 class IPControllerAppConfigLoader(ClusterDirConfigLoader):
113
113
114 def _add_arguments(self):
114 def _add_arguments(self):
115 super(IPControllerAppConfigLoader, self)._add_arguments()
115 super(IPControllerAppConfigLoader, self)._add_arguments()
116 paa = self.parser.add_argument
116 paa = self.parser.add_argument
117
117
118 ## Hub Config:
118 ## Hub Config:
119 paa('--mongodb',
119 paa('--mongodb',
120 dest='HubFactory.db_class', action='store_const',
120 dest='HubFactory.db_class', action='store_const',
121 const='IPython.zmq.parallel.mongodb.MongoDB',
121 const='IPython.zmq.parallel.mongodb.MongoDB',
122 help='Use MongoDB task storage [default: in-memory]')
122 help='Use MongoDB task storage [default: in-memory]')
123 paa('--hb',
123 paa('--hb',
124 type=int, dest='HubFactory.hb', nargs=2,
124 type=int, dest='HubFactory.hb', nargs=2,
125 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
125 help='The (2) ports the Hub\'s Heartmonitor will use for the heartbeat '
126 'connections [default: random]',
126 'connections [default: random]',
127 metavar='Hub.hb_ports')
127 metavar='Hub.hb_ports')
128 paa('--ping',
128 paa('--ping',
129 type=int, dest='HubFactory.ping',
129 type=int, dest='HubFactory.ping',
130 help='The frequency at which the Hub pings the engines for heartbeats '
130 help='The frequency at which the Hub pings the engines for heartbeats '
131 ' (in ms) [default: 100]',
131 ' (in ms) [default: 100]',
132 metavar='Hub.ping')
132 metavar='Hub.ping')
133
133
134 # Client config
134 # Client config
135 paa('--client-ip',
135 paa('--client-ip',
136 type=str, dest='HubFactory.client_ip',
136 type=str, dest='HubFactory.client_ip',
137 help='The IP address or hostname the Hub will listen on for '
137 help='The IP address or hostname the Hub will listen on for '
138 'client connections. Both engine-ip and client-ip can be set simultaneously '
138 'client connections. Both engine-ip and client-ip can be set simultaneously '
139 'via --ip [default: loopback]',
139 'via --ip [default: loopback]',
140 metavar='Hub.client_ip')
140 metavar='Hub.client_ip')
141 paa('--client-transport',
141 paa('--client-transport',
142 type=str, dest='HubFactory.client_transport',
142 type=str, dest='HubFactory.client_transport',
143 help='The ZeroMQ transport the Hub will use for '
143 help='The ZeroMQ transport the Hub will use for '
144 'client connections. Both engine-transport and client-transport can be set simultaneously '
144 'client connections. Both engine-transport and client-transport can be set simultaneously '
145 'via --transport [default: tcp]',
145 'via --transport [default: tcp]',
146 metavar='Hub.client_transport')
146 metavar='Hub.client_transport')
147 paa('--query',
147 paa('--query',
148 type=int, dest='HubFactory.query_port',
148 type=int, dest='HubFactory.query_port',
149 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
149 help='The port on which the Hub XREP socket will listen for result queries from clients [default: random]',
150 metavar='Hub.query_port')
150 metavar='Hub.query_port')
151 paa('--notifier',
151 paa('--notifier',
152 type=int, dest='HubFactory.notifier_port',
152 type=int, dest='HubFactory.notifier_port',
153 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
153 help='The port on which the Hub PUB socket will listen for notification connections [default: random]',
154 metavar='Hub.notifier_port')
154 metavar='Hub.notifier_port')
155
155
156 # Engine config
156 # Engine config
157 paa('--engine-ip',
157 paa('--engine-ip',
158 type=str, dest='HubFactory.engine_ip',
158 type=str, dest='HubFactory.engine_ip',
159 help='The IP address or hostname the Hub will listen on for '
159 help='The IP address or hostname the Hub will listen on for '
160 'engine connections. This applies to the Hub and its schedulers'
160 'engine connections. This applies to the Hub and its schedulers'
161 'engine-ip and client-ip can be set simultaneously '
161 'engine-ip and client-ip can be set simultaneously '
162 'via --ip [default: loopback]',
162 'via --ip [default: loopback]',
163 metavar='Hub.engine_ip')
163 metavar='Hub.engine_ip')
164 paa('--engine-transport',
164 paa('--engine-transport',
165 type=str, dest='HubFactory.engine_transport',
165 type=str, dest='HubFactory.engine_transport',
166 help='The ZeroMQ transport the Hub will use for '
166 help='The ZeroMQ transport the Hub will use for '
167 'client connections. Both engine-transport and client-transport can be set simultaneously '
167 'client connections. Both engine-transport and client-transport can be set simultaneously '
168 'via --transport [default: tcp]',
168 'via --transport [default: tcp]',
169 metavar='Hub.engine_transport')
169 metavar='Hub.engine_transport')
170
170
171 # Scheduler config
171 # Scheduler config
172 paa('--mux',
172 paa('--mux',
173 type=int, dest='ControllerFactory.mux', nargs=2,
173 type=int, dest='ControllerFactory.mux', nargs=2,
174 help='The (2) ports the MUX scheduler will listen on for client,engine '
174 help='The (2) ports the MUX scheduler will listen on for client,engine '
175 'connections, respectively [default: random]',
175 'connections, respectively [default: random]',
176 metavar='Scheduler.mux_ports')
176 metavar='Scheduler.mux_ports')
177 paa('--task',
177 paa('--task',
178 type=int, dest='ControllerFactory.task', nargs=2,
178 type=int, dest='ControllerFactory.task', nargs=2,
179 help='The (2) ports the Task scheduler will listen on for client,engine '
179 help='The (2) ports the Task scheduler will listen on for client,engine '
180 'connections, respectively [default: random]',
180 'connections, respectively [default: random]',
181 metavar='Scheduler.task_ports')
181 metavar='Scheduler.task_ports')
182 paa('--control',
182 paa('--control',
183 type=int, dest='ControllerFactory.control', nargs=2,
183 type=int, dest='ControllerFactory.control', nargs=2,
184 help='The (2) ports the Control scheduler will listen on for client,engine '
184 help='The (2) ports the Control scheduler will listen on for client,engine '
185 'connections, respectively [default: random]',
185 'connections, respectively [default: random]',
186 metavar='Scheduler.control_ports')
186 metavar='Scheduler.control_ports')
187 paa('--iopub',
187 paa('--iopub',
188 type=int, dest='ControllerFactory.iopub', nargs=2,
188 type=int, dest='ControllerFactory.iopub', nargs=2,
189 help='The (2) ports the IOPub scheduler will listen on for client,engine '
189 help='The (2) ports the IOPub scheduler will listen on for client,engine '
190 'connections, respectively [default: random]',
190 'connections, respectively [default: random]',
191 metavar='Scheduler.iopub_ports')
191 metavar='Scheduler.iopub_ports')
192 paa('--scheme',
192 paa('--scheme',
193 type=str, dest='ControllerFactory.scheme',
193 type=str, dest='HubFactory.scheme',
194 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
194 choices = ['pure', 'lru', 'plainrandom', 'weighted', 'twobin','leastload'],
195 help='select the task scheduler scheme [default: Python LRU]',
195 help='select the task scheduler scheme [default: Python LRU]',
196 metavar='Scheduler.scheme')
196 metavar='Scheduler.scheme')
197 paa('--usethreads',
197 paa('--usethreads',
198 dest='ControllerFactory.usethreads', action="store_true",
198 dest='ControllerFactory.usethreads', action="store_true",
199 help='Use threads instead of processes for the schedulers',
199 help='Use threads instead of processes for the schedulers',
200 )
200 )
201
201
202 ## Global config
202 ## Global config
203 paa('--log-to-file',
203 paa('--log-to-file',
204 action='store_true', dest='Global.log_to_file',
204 action='store_true', dest='Global.log_to_file',
205 help='Log to a file in the log directory (default is stdout)')
205 help='Log to a file in the log directory (default is stdout)')
206 paa('--log-url',
206 paa('--log-url',
207 type=str, dest='Global.log_url',
207 type=str, dest='Global.log_url',
208 help='Broadcast logs to an iploggerz process [default: disabled]')
208 help='Broadcast logs to an iploggerz process [default: disabled]')
209 paa('-r','--reuse-key',
209 paa('-r','--reuse-key',
210 action='store_true', dest='Global.reuse_key',
210 action='store_true', dest='Global.reuse_key',
211 help='Try to reuse existing execution keys.')
211 help='Try to reuse existing execution keys.')
212 paa('--no-secure',
212 paa('--no-secure',
213 action='store_false', dest='Global.secure',
213 action='store_false', dest='Global.secure',
214 help='Turn off execution keys (default).')
214 help='Turn off execution keys (default).')
215 paa('--secure',
215 paa('--secure',
216 action='store_true', dest='Global.secure',
216 action='store_true', dest='Global.secure',
217 help='Turn on execution keys.')
217 help='Turn on execution keys.')
218 paa('--execkey',
218 paa('--execkey',
219 type=str, dest='Global.exec_key',
219 type=str, dest='Global.exec_key',
220 help='path to a file containing an execution key.',
220 help='path to a file containing an execution key.',
221 metavar='keyfile')
221 metavar='keyfile')
222 paa('--ssh',
222 paa('--ssh',
223 type=str, dest='Global.sshserver',
223 type=str, dest='Global.sshserver',
224 help='ssh url for clients to use when connecting to the Controller '
224 help='ssh url for clients to use when connecting to the Controller '
225 'processes. It should be of the form: [user@]server[:port]. The '
225 'processes. It should be of the form: [user@]server[:port]. The '
226 'Controller\'s listening addresses must be accessible from the ssh server',
226 'Controller\'s listening addresses must be accessible from the ssh server',
227 metavar='Global.sshserver')
227 metavar='Global.sshserver')
228 paa('--location',
228 paa('--location',
229 type=str, dest='Global.location',
229 type=str, dest='Global.location',
230 help="The external IP or domain name of this machine, used for disambiguating "
230 help="The external IP or domain name of this machine, used for disambiguating "
231 "engine and client connections.",
231 "engine and client connections.",
232 metavar='Global.location')
232 metavar='Global.location')
233 factory.add_session_arguments(self.parser)
233 factory.add_session_arguments(self.parser)
234 factory.add_registration_arguments(self.parser)
234 factory.add_registration_arguments(self.parser)
235
235
236
236
237 #-----------------------------------------------------------------------------
237 #-----------------------------------------------------------------------------
238 # The main application
238 # The main application
239 #-----------------------------------------------------------------------------
239 #-----------------------------------------------------------------------------
240
240
241
241
242 class IPControllerApp(ApplicationWithClusterDir):
242 class IPControllerApp(ApplicationWithClusterDir):
243
243
244 name = u'ipcontrollerz'
244 name = u'ipcontrollerz'
245 description = _description
245 description = _description
246 command_line_loader = IPControllerAppConfigLoader
246 command_line_loader = IPControllerAppConfigLoader
247 default_config_file_name = default_config_file_name
247 default_config_file_name = default_config_file_name
248 auto_create_cluster_dir = True
248 auto_create_cluster_dir = True
249
249
250
250
251 def create_default_config(self):
251 def create_default_config(self):
252 super(IPControllerApp, self).create_default_config()
252 super(IPControllerApp, self).create_default_config()
253 # Don't set defaults for Global.secure or Global.reuse_furls
253 # Don't set defaults for Global.secure or Global.reuse_furls
254 # as those are set in a component.
254 # as those are set in a component.
255 self.default_config.Global.import_statements = []
255 self.default_config.Global.import_statements = []
256 self.default_config.Global.clean_logs = True
256 self.default_config.Global.clean_logs = True
257 self.default_config.Global.secure = True
257 self.default_config.Global.secure = True
258 self.default_config.Global.reuse_key = False
258 self.default_config.Global.reuse_key = False
259 self.default_config.Global.exec_key = "exec_key.key"
259 self.default_config.Global.exec_key = "exec_key.key"
260 self.default_config.Global.sshserver = None
260 self.default_config.Global.sshserver = None
261 self.default_config.Global.location = None
261 self.default_config.Global.location = None
262
262
263 def pre_construct(self):
263 def pre_construct(self):
264 super(IPControllerApp, self).pre_construct()
264 super(IPControllerApp, self).pre_construct()
265 c = self.master_config
265 c = self.master_config
266 # The defaults for these are set in FCClientServiceFactory and
266 # The defaults for these are set in FCClientServiceFactory and
267 # FCEngineServiceFactory, so we only set them here if the global
267 # FCEngineServiceFactory, so we only set them here if the global
268 # options have be set to override the class level defaults.
268 # options have be set to override the class level defaults.
269
269
270 # if hasattr(c.Global, 'reuse_furls'):
270 # if hasattr(c.Global, 'reuse_furls'):
271 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
271 # c.FCClientServiceFactory.reuse_furls = c.Global.reuse_furls
272 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
272 # c.FCEngineServiceFactory.reuse_furls = c.Global.reuse_furls
273 # del c.Global.reuse_furls
273 # del c.Global.reuse_furls
274 # if hasattr(c.Global, 'secure'):
274 # if hasattr(c.Global, 'secure'):
275 # c.FCClientServiceFactory.secure = c.Global.secure
275 # c.FCClientServiceFactory.secure = c.Global.secure
276 # c.FCEngineServiceFactory.secure = c.Global.secure
276 # c.FCEngineServiceFactory.secure = c.Global.secure
277 # del c.Global.secure
277 # del c.Global.secure
278
278
279 def save_connection_dict(self, fname, cdict):
279 def save_connection_dict(self, fname, cdict):
280 """save a connection dict to json file."""
280 """save a connection dict to json file."""
281 c = self.master_config
281 c = self.master_config
282 url = cdict['url']
282 url = cdict['url']
283 location = cdict['location']
283 location = cdict['location']
284 if not location:
284 if not location:
285 try:
285 try:
286 proto,ip,port = split_url(url)
286 proto,ip,port = split_url(url)
287 except AssertionError:
287 except AssertionError:
288 pass
288 pass
289 else:
289 else:
290 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
290 location = socket.gethostbyname_ex(socket.gethostname())[2][-1]
291 cdict['location'] = location
291 cdict['location'] = location
292 fname = os.path.join(c.Global.security_dir, fname)
292 fname = os.path.join(c.Global.security_dir, fname)
293 with open(fname, 'w') as f:
293 with open(fname, 'w') as f:
294 f.write(json.dumps(cdict, indent=2))
294 f.write(json.dumps(cdict, indent=2))
295 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
295 os.chmod(fname, stat.S_IRUSR|stat.S_IWUSR)
296
296
297 def construct(self):
297 def construct(self):
298 # This is the working dir by now.
298 # This is the working dir by now.
299 sys.path.insert(0, '')
299 sys.path.insert(0, '')
300 c = self.master_config
300 c = self.master_config
301
301
302 self.import_statements()
302 self.import_statements()
303
303
304 if c.Global.secure:
304 if c.Global.secure:
305 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
305 keyfile = os.path.join(c.Global.security_dir, c.Global.exec_key)
306 if not c.Global.reuse_key or not os.path.exists(keyfile):
306 if not c.Global.reuse_key or not os.path.exists(keyfile):
307 key = str(uuid.uuid4())
307 key = str(uuid.uuid4())
308 with open(keyfile, 'w') as f:
308 with open(keyfile, 'w') as f:
309 f.write(key)
309 f.write(key)
310 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
310 os.chmod(keyfile, stat.S_IRUSR|stat.S_IWUSR)
311 else:
311 else:
312 with open(keyfile) as f:
312 with open(keyfile) as f:
313 key = f.read().strip()
313 key = f.read().strip()
314 c.SessionFactory.exec_key = key
314 c.SessionFactory.exec_key = key
315 else:
315 else:
316 c.SessionFactory.exec_key = ''
316 c.SessionFactory.exec_key = ''
317 key = None
317 key = None
318
318
319 try:
319 try:
320 self.factory = ControllerFactory(config=c, logname=self.log.name)
320 self.factory = ControllerFactory(config=c, logname=self.log.name)
321 self.start_logging()
321 self.start_logging()
322 self.factory.construct()
322 self.factory.construct()
323 except:
323 except:
324 self.log.error("Couldn't construct the Controller", exc_info=True)
324 self.log.error("Couldn't construct the Controller", exc_info=True)
325 self.exit(1)
325 self.exit(1)
326
326
327 f = self.factory
327 f = self.factory
328 cdict = {'exec_key' : key,
328 cdict = {'exec_key' : key,
329 'ssh' : c.Global.sshserver,
329 'ssh' : c.Global.sshserver,
330 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
330 'url' : "%s://%s:%s"%(f.client_transport, f.client_ip, f.regport),
331 'location' : c.Global.location
331 'location' : c.Global.location
332 }
332 }
333 self.save_connection_dict('ipcontroller-client.json', cdict)
333 self.save_connection_dict('ipcontroller-client.json', cdict)
334 edict = cdict
334 edict = cdict
335 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
335 edict['url']="%s://%s:%s"%((f.client_transport, f.client_ip, f.regport))
336 self.save_connection_dict('ipcontroller-engine.json', edict)
336 self.save_connection_dict('ipcontroller-engine.json', edict)
337
337
338
338
339 def save_urls(self):
339 def save_urls(self):
340 """save the registration urls to files."""
340 """save the registration urls to files."""
341 c = self.master_config
341 c = self.master_config
342
342
343 sec_dir = c.Global.security_dir
343 sec_dir = c.Global.security_dir
344 cf = self.factory
344 cf = self.factory
345
345
346 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
346 with open(os.path.join(sec_dir, 'ipcontroller-engine.url'), 'w') as f:
347 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
347 f.write("%s://%s:%s"%(cf.engine_transport, cf.engine_ip, cf.regport))
348
348
349 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
349 with open(os.path.join(sec_dir, 'ipcontroller-client.url'), 'w') as f:
350 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
350 f.write("%s://%s:%s"%(cf.client_transport, cf.client_ip, cf.regport))
351
351
352
352
353 def import_statements(self):
353 def import_statements(self):
354 statements = self.master_config.Global.import_statements
354 statements = self.master_config.Global.import_statements
355 for s in statements:
355 for s in statements:
356 try:
356 try:
357 self.log.msg("Executing statement: '%s'" % s)
357 self.log.msg("Executing statement: '%s'" % s)
358 exec s in globals(), locals()
358 exec s in globals(), locals()
359 except:
359 except:
360 self.log.msg("Error running statement: %s" % s)
360 self.log.msg("Error running statement: %s" % s)
361
361
362 def start_logging(self):
362 def start_logging(self):
363 super(IPControllerApp, self).start_logging()
363 super(IPControllerApp, self).start_logging()
364 if self.master_config.Global.log_url:
364 if self.master_config.Global.log_url:
365 context = self.factory.context
365 context = self.factory.context
366 lsock = context.socket(zmq.PUB)
366 lsock = context.socket(zmq.PUB)
367 lsock.connect(self.master_config.Global.log_url)
367 lsock.connect(self.master_config.Global.log_url)
368 handler = PUBHandler(lsock)
368 handler = PUBHandler(lsock)
369 handler.root_topic = 'controller'
369 handler.root_topic = 'controller'
370 handler.setLevel(self.log_level)
370 handler.setLevel(self.log_level)
371 self.log.addHandler(handler)
371 self.log.addHandler(handler)
372 #
372 #
373 def start_app(self):
373 def start_app(self):
374 # Start the subprocesses:
374 # Start the subprocesses:
375 self.factory.start()
375 self.factory.start()
376 self.write_pid_file(overwrite=True)
376 self.write_pid_file(overwrite=True)
377 try:
377 try:
378 self.factory.loop.start()
378 self.factory.loop.start()
379 except KeyboardInterrupt:
379 except KeyboardInterrupt:
380 self.log.critical("Interrupted, Exiting...\n")
380 self.log.critical("Interrupted, Exiting...\n")
381
381
382
382
383 def launch_new_instance():
383 def launch_new_instance():
384 """Create and run the IPython controller"""
384 """Create and run the IPython controller"""
385 app = IPControllerApp()
385 app = IPControllerApp()
386 app.start()
386 app.start()
387
387
388
388
389 if __name__ == '__main__':
389 if __name__ == '__main__':
390 launch_new_instance()
390 launch_new_instance()
@@ -1,545 +1,542 b''
1 """The Python scheduler for rich scheduling.
1 """The Python scheduler for rich scheduling.
2
2
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
3 The Pure ZMQ scheduler does not allow routing schemes other than LRU,
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
4 nor does it check msg_id DAG dependencies. For those, a slightly slower
5 Python Scheduler exists.
5 Python Scheduler exists.
6 """
6 """
7
7
8 #----------------------------------------------------------------------
8 #----------------------------------------------------------------------
9 # Imports
9 # Imports
10 #----------------------------------------------------------------------
10 #----------------------------------------------------------------------
11
11
12 from __future__ import print_function
12 from __future__ import print_function
13 import sys
13 import sys
14 import logging
14 import logging
15 from random import randint, random
15 from random import randint, random
16 from types import FunctionType
16 from types import FunctionType
17 from datetime import datetime, timedelta
17 from datetime import datetime, timedelta
18 try:
18 try:
19 import numpy
19 import numpy
20 except ImportError:
20 except ImportError:
21 numpy = None
21 numpy = None
22
22
23 import zmq
23 import zmq
24 from zmq.eventloop import ioloop, zmqstream
24 from zmq.eventloop import ioloop, zmqstream
25
25
26 # local imports
26 # local imports
27 from IPython.external.decorator import decorator
27 from IPython.external.decorator import decorator
28 # from IPython.config.configurable import Configurable
28 # from IPython.config.configurable import Configurable
29 from IPython.utils.traitlets import Instance, Dict, List, Set
29 from IPython.utils.traitlets import Instance, Dict, List, Set
30
30
31 import error
31 import error
32 # from client import Client
32 # from client import Client
33 from dependency import Dependency
33 from dependency import Dependency
34 import streamsession as ss
34 import streamsession as ss
35 from entry_point import connect_logger, local_logger
35 from entry_point import connect_logger, local_logger
36 from factory import SessionFactory
36 from factory import SessionFactory
37
37
38
38
39 @decorator
39 @decorator
40 def logged(f,self,*args,**kwargs):
40 def logged(f,self,*args,**kwargs):
41 # print ("#--------------------")
41 # print ("#--------------------")
42 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
42 self.log.debug("scheduler::%s(*%s,**%s)"%(f.func_name, args, kwargs))
43 # print ("#--")
43 # print ("#--")
44 return f(self,*args, **kwargs)
44 return f(self,*args, **kwargs)
45
45
46 #----------------------------------------------------------------------
46 #----------------------------------------------------------------------
47 # Chooser functions
47 # Chooser functions
48 #----------------------------------------------------------------------
48 #----------------------------------------------------------------------
49
49
50 def plainrandom(loads):
50 def plainrandom(loads):
51 """Plain random pick."""
51 """Plain random pick."""
52 n = len(loads)
52 n = len(loads)
53 return randint(0,n-1)
53 return randint(0,n-1)
54
54
55 def lru(loads):
55 def lru(loads):
56 """Always pick the front of the line.
56 """Always pick the front of the line.
57
57
58 The content of `loads` is ignored.
58 The content of `loads` is ignored.
59
59
60 Assumes LRU ordering of loads, with oldest first.
60 Assumes LRU ordering of loads, with oldest first.
61 """
61 """
62 return 0
62 return 0
63
63
64 def twobin(loads):
64 def twobin(loads):
65 """Pick two at random, use the LRU of the two.
65 """Pick two at random, use the LRU of the two.
66
66
67 The content of loads is ignored.
67 The content of loads is ignored.
68
68
69 Assumes LRU ordering of loads, with oldest first.
69 Assumes LRU ordering of loads, with oldest first.
70 """
70 """
71 n = len(loads)
71 n = len(loads)
72 a = randint(0,n-1)
72 a = randint(0,n-1)
73 b = randint(0,n-1)
73 b = randint(0,n-1)
74 return min(a,b)
74 return min(a,b)
75
75
76 def weighted(loads):
76 def weighted(loads):
77 """Pick two at random using inverse load as weight.
77 """Pick two at random using inverse load as weight.
78
78
79 Return the less loaded of the two.
79 Return the less loaded of the two.
80 """
80 """
81 # weight 0 a million times more than 1:
81 # weight 0 a million times more than 1:
82 weights = 1./(1e-6+numpy.array(loads))
82 weights = 1./(1e-6+numpy.array(loads))
83 sums = weights.cumsum()
83 sums = weights.cumsum()
84 t = sums[-1]
84 t = sums[-1]
85 x = random()*t
85 x = random()*t
86 y = random()*t
86 y = random()*t
87 idx = 0
87 idx = 0
88 idy = 0
88 idy = 0
89 while sums[idx] < x:
89 while sums[idx] < x:
90 idx += 1
90 idx += 1
91 while sums[idy] < y:
91 while sums[idy] < y:
92 idy += 1
92 idy += 1
93 if weights[idy] > weights[idx]:
93 if weights[idy] > weights[idx]:
94 return idy
94 return idy
95 else:
95 else:
96 return idx
96 return idx
97
97
98 def leastload(loads):
98 def leastload(loads):
99 """Always choose the lowest load.
99 """Always choose the lowest load.
100
100
101 If the lowest load occurs more than once, the first
101 If the lowest load occurs more than once, the first
102 occurance will be used. If loads has LRU ordering, this means
102 occurance will be used. If loads has LRU ordering, this means
103 the LRU of those with the lowest load is chosen.
103 the LRU of those with the lowest load is chosen.
104 """
104 """
105 return loads.index(min(loads))
105 return loads.index(min(loads))
106
106
107 #---------------------------------------------------------------------
107 #---------------------------------------------------------------------
108 # Classes
108 # Classes
109 #---------------------------------------------------------------------
109 #---------------------------------------------------------------------
110 # store empty default dependency:
110 # store empty default dependency:
111 MET = Dependency([])
111 MET = Dependency([])
112
112
113 class TaskScheduler(SessionFactory):
113 class TaskScheduler(SessionFactory):
114 """Python TaskScheduler object.
114 """Python TaskScheduler object.
115
115
116 This is the simplest object that supports msg_id based
116 This is the simplest object that supports msg_id based
117 DAG dependencies. *Only* task msg_ids are checked, not
117 DAG dependencies. *Only* task msg_ids are checked, not
118 msg_ids of jobs submitted via the MUX queue.
118 msg_ids of jobs submitted via the MUX queue.
119
119
120 """
120 """
121
121
122 # input arguments:
122 # input arguments:
123 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
123 scheme = Instance(FunctionType, default=leastload) # function for determining the destination
124 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
124 client_stream = Instance(zmqstream.ZMQStream) # client-facing stream
125 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
125 engine_stream = Instance(zmqstream.ZMQStream) # engine-facing stream
126 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
126 notifier_stream = Instance(zmqstream.ZMQStream) # hub-facing sub stream
127 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
127 mon_stream = Instance(zmqstream.ZMQStream) # hub-facing pub stream
128
128
129 # internals:
129 # internals:
130 dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ]
130 dependencies = Dict() # dict by msg_id of [ msg_ids that depend on key ]
131 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
131 depending = Dict() # dict by msg_id of (msg_id, raw_msg, after, follow)
132 pending = Dict() # dict by engine_uuid of submitted tasks
132 pending = Dict() # dict by engine_uuid of submitted tasks
133 completed = Dict() # dict by engine_uuid of completed tasks
133 completed = Dict() # dict by engine_uuid of completed tasks
134 failed = Dict() # dict by engine_uuid of failed tasks
134 failed = Dict() # dict by engine_uuid of failed tasks
135 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
135 destinations = Dict() # dict by msg_id of engine_uuids where jobs ran (reverse of completed+failed)
136 clients = Dict() # dict by msg_id for who submitted the task
136 clients = Dict() # dict by msg_id for who submitted the task
137 targets = List() # list of target IDENTs
137 targets = List() # list of target IDENTs
138 loads = List() # list of engine loads
138 loads = List() # list of engine loads
139 all_completed = Set() # set of all completed tasks
139 all_completed = Set() # set of all completed tasks
140 all_failed = Set() # set of all failed tasks
140 all_failed = Set() # set of all failed tasks
141 all_done = Set() # set of all finished tasks=union(completed,failed)
141 all_done = Set() # set of all finished tasks=union(completed,failed)
142 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
142 blacklist = Dict() # dict by msg_id of locations where a job has encountered UnmetDependency
143 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
143 auditor = Instance('zmq.eventloop.ioloop.PeriodicCallback')
144
144
145
145
146 def start(self):
146 def start(self):
147 self.engine_stream.on_recv(self.dispatch_result, copy=False)
147 self.engine_stream.on_recv(self.dispatch_result, copy=False)
148 self._notification_handlers = dict(
148 self._notification_handlers = dict(
149 registration_notification = self._register_engine,
149 registration_notification = self._register_engine,
150 unregistration_notification = self._unregister_engine
150 unregistration_notification = self._unregister_engine
151 )
151 )
152 self.notifier_stream.on_recv(self.dispatch_notification)
152 self.notifier_stream.on_recv(self.dispatch_notification)
153 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
153 self.auditor = ioloop.PeriodicCallback(self.audit_timeouts, 2e3, self.loop) # 1 Hz
154 self.auditor.start()
154 self.auditor.start()
155 self.log.info("Scheduler started...%r"%self)
155 self.log.info("Scheduler started...%r"%self)
156
156
157 def resume_receiving(self):
157 def resume_receiving(self):
158 """Resume accepting jobs."""
158 """Resume accepting jobs."""
159 self.client_stream.on_recv(self.dispatch_submission, copy=False)
159 self.client_stream.on_recv(self.dispatch_submission, copy=False)
160
160
161 def stop_receiving(self):
161 def stop_receiving(self):
162 """Stop accepting jobs while there are no engines.
162 """Stop accepting jobs while there are no engines.
163 Leave them in the ZMQ queue."""
163 Leave them in the ZMQ queue."""
164 self.client_stream.on_recv(None)
164 self.client_stream.on_recv(None)
165
165
166 #-----------------------------------------------------------------------
166 #-----------------------------------------------------------------------
167 # [Un]Registration Handling
167 # [Un]Registration Handling
168 #-----------------------------------------------------------------------
168 #-----------------------------------------------------------------------
169
169
170 def dispatch_notification(self, msg):
170 def dispatch_notification(self, msg):
171 """dispatch register/unregister events."""
171 """dispatch register/unregister events."""
172 idents,msg = self.session.feed_identities(msg)
172 idents,msg = self.session.feed_identities(msg)
173 msg = self.session.unpack_message(msg)
173 msg = self.session.unpack_message(msg)
174 msg_type = msg['msg_type']
174 msg_type = msg['msg_type']
175 handler = self._notification_handlers.get(msg_type, None)
175 handler = self._notification_handlers.get(msg_type, None)
176 if handler is None:
176 if handler is None:
177 raise Exception("Unhandled message type: %s"%msg_type)
177 raise Exception("Unhandled message type: %s"%msg_type)
178 else:
178 else:
179 try:
179 try:
180 handler(str(msg['content']['queue']))
180 handler(str(msg['content']['queue']))
181 except KeyError:
181 except KeyError:
182 self.log.error("task::Invalid notification msg: %s"%msg)
182 self.log.error("task::Invalid notification msg: %s"%msg)
183
183
184 @logged
184 @logged
185 def _register_engine(self, uid):
185 def _register_engine(self, uid):
186 """New engine with ident `uid` became available."""
186 """New engine with ident `uid` became available."""
187 # head of the line:
187 # head of the line:
188 self.targets.insert(0,uid)
188 self.targets.insert(0,uid)
189 self.loads.insert(0,0)
189 self.loads.insert(0,0)
190 # initialize sets
190 # initialize sets
191 self.completed[uid] = set()
191 self.completed[uid] = set()
192 self.failed[uid] = set()
192 self.failed[uid] = set()
193 self.pending[uid] = {}
193 self.pending[uid] = {}
194 if len(self.targets) == 1:
194 if len(self.targets) == 1:
195 self.resume_receiving()
195 self.resume_receiving()
196
196
197 def _unregister_engine(self, uid):
197 def _unregister_engine(self, uid):
198 """Existing engine with ident `uid` became unavailable."""
198 """Existing engine with ident `uid` became unavailable."""
199 if len(self.targets) == 1:
199 if len(self.targets) == 1:
200 # this was our only engine
200 # this was our only engine
201 self.stop_receiving()
201 self.stop_receiving()
202
202
203 # handle any potentially finished tasks:
203 # handle any potentially finished tasks:
204 self.engine_stream.flush()
204 self.engine_stream.flush()
205
205
206 self.completed.pop(uid)
206 self.completed.pop(uid)
207 self.failed.pop(uid)
207 self.failed.pop(uid)
208 # don't pop destinations, because it might be used later
208 # don't pop destinations, because it might be used later
209 # map(self.destinations.pop, self.completed.pop(uid))
209 # map(self.destinations.pop, self.completed.pop(uid))
210 # map(self.destinations.pop, self.failed.pop(uid))
210 # map(self.destinations.pop, self.failed.pop(uid))
211
211
212 idx = self.targets.index(uid)
212 idx = self.targets.index(uid)
213 self.targets.pop(idx)
213 self.targets.pop(idx)
214 self.loads.pop(idx)
214 self.loads.pop(idx)
215
215
216 # wait 5 seconds before cleaning up pending jobs, since the results might
216 # wait 5 seconds before cleaning up pending jobs, since the results might
217 # still be incoming
217 # still be incoming
218 if self.pending[uid]:
218 if self.pending[uid]:
219 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
219 dc = ioloop.DelayedCallback(lambda : self.handle_stranded_tasks(uid), 5000, self.loop)
220 dc.start()
220 dc.start()
221
221
222 @logged
222 @logged
223 def handle_stranded_tasks(self, engine):
223 def handle_stranded_tasks(self, engine):
224 """Deal with jobs resident in an engine that died."""
224 """Deal with jobs resident in an engine that died."""
225 lost = self.pending.pop(engine)
225 lost = self.pending.pop(engine)
226
226
227 for msg_id, (raw_msg,follow) in lost.iteritems():
227 for msg_id, (raw_msg,follow) in lost.iteritems():
228 self.all_failed.add(msg_id)
228 self.all_failed.add(msg_id)
229 self.all_done.add(msg_id)
229 self.all_done.add(msg_id)
230 idents,msg = self.session.feed_identities(raw_msg, copy=False)
230 idents,msg = self.session.feed_identities(raw_msg, copy=False)
231 msg = self.session.unpack_message(msg, copy=False, content=False)
231 msg = self.session.unpack_message(msg, copy=False, content=False)
232 parent = msg['header']
232 parent = msg['header']
233 idents = [idents[0],engine]+idents[1:]
233 idents = [idents[0],engine]+idents[1:]
234 print (idents)
234 print (idents)
235 try:
235 try:
236 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
236 raise error.EngineError("Engine %r died while running task %r"%(engine, msg_id))
237 except:
237 except:
238 content = ss.wrap_exception()
238 content = ss.wrap_exception()
239 msg = self.session.send(self.client_stream, 'apply_reply', content,
239 msg = self.session.send(self.client_stream, 'apply_reply', content,
240 parent=parent, ident=idents)
240 parent=parent, ident=idents)
241 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
241 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
242 self.update_dependencies(msg_id)
242 self.update_dependencies(msg_id)
243
243
244
244
245 #-----------------------------------------------------------------------
245 #-----------------------------------------------------------------------
246 # Job Submission
246 # Job Submission
247 #-----------------------------------------------------------------------
247 #-----------------------------------------------------------------------
248 @logged
248 @logged
249 def dispatch_submission(self, raw_msg):
249 def dispatch_submission(self, raw_msg):
250 """Dispatch job submission to appropriate handlers."""
250 """Dispatch job submission to appropriate handlers."""
251 # ensure targets up to date:
251 # ensure targets up to date:
252 self.notifier_stream.flush()
252 self.notifier_stream.flush()
253 try:
253 try:
254 idents, msg = self.session.feed_identities(raw_msg, copy=False)
254 idents, msg = self.session.feed_identities(raw_msg, copy=False)
255 except Exception as e:
255 except Exception as e:
256 self.log.error("task::Invaid msg: %s"%msg)
256 self.log.error("task::Invaid msg: %s"%msg)
257 return
257 return
258
258
259 # send to monitor
259 # send to monitor
260 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
260 self.mon_stream.send_multipart(['intask']+raw_msg, copy=False)
261
261
262 msg = self.session.unpack_message(msg, content=False, copy=False)
262 msg = self.session.unpack_message(msg, content=False, copy=False)
263 header = msg['header']
263 header = msg['header']
264 msg_id = header['msg_id']
264 msg_id = header['msg_id']
265
265
266 # time dependencies
266 # time dependencies
267 after = Dependency(header.get('after', []))
267 after = Dependency(header.get('after', []))
268 if after.mode == 'all':
268 if after.mode == 'all':
269 after.difference_update(self.all_completed)
269 after.difference_update(self.all_completed)
270 if not after.success_only:
270 if not after.success_only:
271 after.difference_update(self.all_failed)
271 after.difference_update(self.all_failed)
272 if after.check(self.all_completed, self.all_failed):
272 if after.check(self.all_completed, self.all_failed):
273 # recast as empty set, if `after` already met,
273 # recast as empty set, if `after` already met,
274 # to prevent unnecessary set comparisons
274 # to prevent unnecessary set comparisons
275 after = MET
275 after = MET
276
276
277 # location dependencies
277 # location dependencies
278 follow = Dependency(header.get('follow', []))
278 follow = Dependency(header.get('follow', []))
279 # check if unreachable:
279 # check if unreachable:
280 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
280 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
281 self.depending[msg_id] = [raw_msg,MET,MET,None]
281 self.depending[msg_id] = [raw_msg,MET,MET,None]
282 return self.fail_unreachable(msg_id)
282 return self.fail_unreachable(msg_id)
283
283
284 # turn timeouts into datetime objects:
284 # turn timeouts into datetime objects:
285 timeout = header.get('timeout', None)
285 timeout = header.get('timeout', None)
286 if timeout:
286 if timeout:
287 timeout = datetime.now() + timedelta(0,timeout,0)
287 timeout = datetime.now() + timedelta(0,timeout,0)
288
288
289 if after.check(self.all_completed, self.all_failed):
289 if after.check(self.all_completed, self.all_failed):
290 # time deps already met, try to run
290 # time deps already met, try to run
291 if not self.maybe_run(msg_id, raw_msg, follow):
291 if not self.maybe_run(msg_id, raw_msg, follow):
292 # can't run yet
292 # can't run yet
293 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
293 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
294 else:
294 else:
295 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
295 self.save_unmet(msg_id, raw_msg, after, follow, timeout)
296
296
297 @logged
297 # @logged
298 def audit_timeouts(self):
298 def audit_timeouts(self):
299 """Audit all waiting tasks for expired timeouts."""
299 """Audit all waiting tasks for expired timeouts."""
300 now = datetime.now()
300 now = datetime.now()
301 for msg_id in self.depending.keys():
301 for msg_id in self.depending.keys():
302 # must recheck, in case one failure cascaded to another:
302 # must recheck, in case one failure cascaded to another:
303 if msg_id in self.depending:
303 if msg_id in self.depending:
304 raw,after,follow,timeout = self.depending[msg_id]
304 raw,after,follow,timeout = self.depending[msg_id]
305 if timeout and timeout < now:
305 if timeout and timeout < now:
306 self.fail_unreachable(msg_id, timeout=True)
306 self.fail_unreachable(msg_id, timeout=True)
307
307
308 @logged
308 @logged
309 def fail_unreachable(self, msg_id, timeout=False):
309 def fail_unreachable(self, msg_id, timeout=False):
310 """a message has become unreachable"""
310 """a message has become unreachable"""
311 if msg_id not in self.depending:
311 if msg_id not in self.depending:
312 self.log.error("msg %r already failed!"%msg_id)
312 self.log.error("msg %r already failed!"%msg_id)
313 return
313 return
314 raw_msg, after, follow, timeout = self.depending.pop(msg_id)
314 raw_msg, after, follow, timeout = self.depending.pop(msg_id)
315 for mid in follow.union(after):
315 for mid in follow.union(after):
316 if mid in self.dependencies:
316 if mid in self.dependencies:
317 self.dependencies[mid].remove(msg_id)
317 self.dependencies[mid].remove(msg_id)
318
318
319 # FIXME: unpacking a message I've already unpacked, but didn't save:
319 # FIXME: unpacking a message I've already unpacked, but didn't save:
320 idents,msg = self.session.feed_identities(raw_msg, copy=False)
320 idents,msg = self.session.feed_identities(raw_msg, copy=False)
321 msg = self.session.unpack_message(msg, copy=False, content=False)
321 msg = self.session.unpack_message(msg, copy=False, content=False)
322 header = msg['header']
322 header = msg['header']
323
323
324 impossible = error.DependencyTimeout if timeout else error.ImpossibleDependency
324 impossible = error.DependencyTimeout if timeout else error.ImpossibleDependency
325
325
326 try:
326 try:
327 raise impossible()
327 raise impossible()
328 except:
328 except:
329 content = ss.wrap_exception()
329 content = ss.wrap_exception()
330
330
331 self.all_done.add(msg_id)
331 self.all_done.add(msg_id)
332 self.all_failed.add(msg_id)
332 self.all_failed.add(msg_id)
333
333
334 msg = self.session.send(self.client_stream, 'apply_reply', content,
334 msg = self.session.send(self.client_stream, 'apply_reply', content,
335 parent=header, ident=idents)
335 parent=header, ident=idents)
336 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
336 self.session.send(self.mon_stream, msg, ident=['outtask']+idents)
337
337
338 self.update_dependencies(msg_id, success=False)
338 self.update_dependencies(msg_id, success=False)
339
339
340 @logged
340 @logged
341 def maybe_run(self, msg_id, raw_msg, follow=None):
341 def maybe_run(self, msg_id, raw_msg, follow=None):
342 """check location dependencies, and run if they are met."""
342 """check location dependencies, and run if they are met."""
343
343
344 if follow:
344 if follow:
345 def can_run(idx):
345 def can_run(idx):
346 target = self.targets[idx]
346 target = self.targets[idx]
347 return target not in self.blacklist.get(msg_id, []) and\
347 return target not in self.blacklist.get(msg_id, []) and\
348 follow.check(self.completed[target], self.failed[target])
348 follow.check(self.completed[target], self.failed[target])
349
349
350 indices = filter(can_run, range(len(self.targets)))
350 indices = filter(can_run, range(len(self.targets)))
351 if not indices:
351 if not indices:
352 # TODO evaluate unmeetable follow dependencies
352 # TODO evaluate unmeetable follow dependencies
353 if follow.mode == 'all':
353 if follow.mode == 'all':
354 dests = set()
354 dests = set()
355 relevant = self.all_completed if follow.success_only else self.all_done
355 relevant = self.all_completed if follow.success_only else self.all_done
356 for m in follow.intersection(relevant):
356 for m in follow.intersection(relevant):
357 dests.add(self.destinations[m])
357 dests.add(self.destinations[m])
358 if len(dests) > 1:
358 if len(dests) > 1:
359 self.fail_unreachable(msg_id)
359 self.fail_unreachable(msg_id)
360
360
361
361
362 return False
362 return False
363 else:
363 else:
364 indices = None
364 indices = None
365
365
366 self.submit_task(msg_id, raw_msg, indices)
366 self.submit_task(msg_id, raw_msg, indices)
367 return True
367 return True
368
368
369 @logged
369 @logged
370 def save_unmet(self, msg_id, raw_msg, after, follow, timeout):
370 def save_unmet(self, msg_id, raw_msg, after, follow, timeout):
371 """Save a message for later submission when its dependencies are met."""
371 """Save a message for later submission when its dependencies are met."""
372 self.depending[msg_id] = [raw_msg,after,follow,timeout]
372 self.depending[msg_id] = [raw_msg,after,follow,timeout]
373 # track the ids in follow or after, but not those already finished
373 # track the ids in follow or after, but not those already finished
374 for dep_id in after.union(follow).difference(self.all_done):
374 for dep_id in after.union(follow).difference(self.all_done):
375 if dep_id not in self.dependencies:
375 if dep_id not in self.dependencies:
376 self.dependencies[dep_id] = set()
376 self.dependencies[dep_id] = set()
377 self.dependencies[dep_id].add(msg_id)
377 self.dependencies[dep_id].add(msg_id)
378
378
379 @logged
379 @logged
380 def submit_task(self, msg_id, raw_msg, follow=None, indices=None):
380 def submit_task(self, msg_id, raw_msg, follow=None, indices=None):
381 """Submit a task to any of a subset of our targets."""
381 """Submit a task to any of a subset of our targets."""
382 if indices:
382 if indices:
383 loads = [self.loads[i] for i in indices]
383 loads = [self.loads[i] for i in indices]
384 else:
384 else:
385 loads = self.loads
385 loads = self.loads
386 idx = self.scheme(loads)
386 idx = self.scheme(loads)
387 if indices:
387 if indices:
388 idx = indices[idx]
388 idx = indices[idx]
389 target = self.targets[idx]
389 target = self.targets[idx]
390 # print (target, map(str, msg[:3]))
390 # print (target, map(str, msg[:3]))
391 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
391 self.engine_stream.send(target, flags=zmq.SNDMORE, copy=False)
392 self.engine_stream.send_multipart(raw_msg, copy=False)
392 self.engine_stream.send_multipart(raw_msg, copy=False)
393 self.add_job(idx)
393 self.add_job(idx)
394 self.pending[target][msg_id] = (raw_msg, follow)
394 self.pending[target][msg_id] = (raw_msg, follow)
395 content = dict(msg_id=msg_id, engine_id=target)
395 content = dict(msg_id=msg_id, engine_id=target)
396 self.session.send(self.mon_stream, 'task_destination', content=content,
396 self.session.send(self.mon_stream, 'task_destination', content=content,
397 ident=['tracktask',self.session.session])
397 ident=['tracktask',self.session.session])
398
398
399 #-----------------------------------------------------------------------
399 #-----------------------------------------------------------------------
400 # Result Handling
400 # Result Handling
401 #-----------------------------------------------------------------------
401 #-----------------------------------------------------------------------
402 @logged
402 @logged
403 def dispatch_result(self, raw_msg):
403 def dispatch_result(self, raw_msg):
404 try:
404 try:
405 idents,msg = self.session.feed_identities(raw_msg, copy=False)
405 idents,msg = self.session.feed_identities(raw_msg, copy=False)
406 except Exception as e:
406 except Exception as e:
407 self.log.error("task::Invaid result: %s"%msg)
407 self.log.error("task::Invaid result: %s"%msg)
408 return
408 return
409 msg = self.session.unpack_message(msg, content=False, copy=False)
409 msg = self.session.unpack_message(msg, content=False, copy=False)
410 header = msg['header']
410 header = msg['header']
411 if header.get('dependencies_met', True):
411 if header.get('dependencies_met', True):
412 success = (header['status'] == 'ok')
412 success = (header['status'] == 'ok')
413 self.handle_result(idents, msg['parent_header'], raw_msg, success)
413 self.handle_result(idents, msg['parent_header'], raw_msg, success)
414 # send to Hub monitor
414 # send to Hub monitor
415 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
415 self.mon_stream.send_multipart(['outtask']+raw_msg, copy=False)
416 else:
416 else:
417 self.handle_unmet_dependency(idents, msg['parent_header'])
417 self.handle_unmet_dependency(idents, msg['parent_header'])
418
418
419 @logged
419 @logged
420 def handle_result(self, idents, parent, raw_msg, success=True):
420 def handle_result(self, idents, parent, raw_msg, success=True):
421 # first, relay result to client
421 # first, relay result to client
422 engine = idents[0]
422 engine = idents[0]
423 client = idents[1]
423 client = idents[1]
424 # swap_ids for XREP-XREP mirror
424 # swap_ids for XREP-XREP mirror
425 raw_msg[:2] = [client,engine]
425 raw_msg[:2] = [client,engine]
426 # print (map(str, raw_msg[:4]))
426 # print (map(str, raw_msg[:4]))
427 self.client_stream.send_multipart(raw_msg, copy=False)
427 self.client_stream.send_multipart(raw_msg, copy=False)
428 # now, update our data structures
428 # now, update our data structures
429 msg_id = parent['msg_id']
429 msg_id = parent['msg_id']
430 self.blacklist.pop(msg_id, None)
430 self.blacklist.pop(msg_id, None)
431 self.pending[engine].pop(msg_id)
431 self.pending[engine].pop(msg_id)
432 if success:
432 if success:
433 self.completed[engine].add(msg_id)
433 self.completed[engine].add(msg_id)
434 self.all_completed.add(msg_id)
434 self.all_completed.add(msg_id)
435 else:
435 else:
436 self.failed[engine].add(msg_id)
436 self.failed[engine].add(msg_id)
437 self.all_failed.add(msg_id)
437 self.all_failed.add(msg_id)
438 self.all_done.add(msg_id)
438 self.all_done.add(msg_id)
439 self.destinations[msg_id] = engine
439 self.destinations[msg_id] = engine
440
440
441 self.update_dependencies(msg_id, success)
441 self.update_dependencies(msg_id, success)
442
442
443 @logged
443 @logged
444 def handle_unmet_dependency(self, idents, parent):
444 def handle_unmet_dependency(self, idents, parent):
445 engine = idents[0]
445 engine = idents[0]
446 msg_id = parent['msg_id']
446 msg_id = parent['msg_id']
447 if msg_id not in self.blacklist:
447 if msg_id not in self.blacklist:
448 self.blacklist[msg_id] = set()
448 self.blacklist[msg_id] = set()
449 self.blacklist[msg_id].add(engine)
449 self.blacklist[msg_id].add(engine)
450 raw_msg,follow,timeout = self.pending[engine].pop(msg_id)
450 raw_msg,follow,timeout = self.pending[engine].pop(msg_id)
451 if not self.maybe_run(msg_id, raw_msg, follow):
451 if not self.maybe_run(msg_id, raw_msg, follow):
452 # resubmit failed, put it back in our dependency tree
452 # resubmit failed, put it back in our dependency tree
453 self.save_unmet(msg_id, raw_msg, MET, follow, timeout)
453 self.save_unmet(msg_id, raw_msg, MET, follow, timeout)
454 pass
454 pass
455
455
456 @logged
456 @logged
457 def update_dependencies(self, dep_id, success=True):
457 def update_dependencies(self, dep_id, success=True):
458 """dep_id just finished. Update our dependency
458 """dep_id just finished. Update our dependency
459 table and submit any jobs that just became runable."""
459 table and submit any jobs that just became runable."""
460 # print ("\n\n***********")
460 # print ("\n\n***********")
461 # pprint (dep_id)
461 # pprint (dep_id)
462 # pprint (self.dependencies)
462 # pprint (self.dependencies)
463 # pprint (self.depending)
463 # pprint (self.depending)
464 # pprint (self.all_completed)
464 # pprint (self.all_completed)
465 # pprint (self.all_failed)
465 # pprint (self.all_failed)
466 # print ("\n\n***********\n\n")
466 # print ("\n\n***********\n\n")
467 if dep_id not in self.dependencies:
467 if dep_id not in self.dependencies:
468 return
468 return
469 jobs = self.dependencies.pop(dep_id)
469 jobs = self.dependencies.pop(dep_id)
470
470
471 for msg_id in jobs:
471 for msg_id in jobs:
472 raw_msg, after, follow, timeout = self.depending[msg_id]
472 raw_msg, after, follow, timeout = self.depending[msg_id]
473 # if dep_id in after:
473 # if dep_id in after:
474 # if after.mode == 'all' and (success or not after.success_only):
474 # if after.mode == 'all' and (success or not after.success_only):
475 # after.remove(dep_id)
475 # after.remove(dep_id)
476
476
477 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
477 if after.unreachable(self.all_failed) or follow.unreachable(self.all_failed):
478 self.fail_unreachable(msg_id)
478 self.fail_unreachable(msg_id)
479
479
480 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
480 elif after.check(self.all_completed, self.all_failed): # time deps met, maybe run
481 self.depending[msg_id][1] = MET
481 self.depending[msg_id][1] = MET
482 if self.maybe_run(msg_id, raw_msg, follow):
482 if self.maybe_run(msg_id, raw_msg, follow):
483
483
484 self.depending.pop(msg_id)
484 self.depending.pop(msg_id)
485 for mid in follow.union(after):
485 for mid in follow.union(after):
486 if mid in self.dependencies:
486 if mid in self.dependencies:
487 self.dependencies[mid].remove(msg_id)
487 self.dependencies[mid].remove(msg_id)
488
488
489 #----------------------------------------------------------------------
489 #----------------------------------------------------------------------
490 # methods to be overridden by subclasses
490 # methods to be overridden by subclasses
491 #----------------------------------------------------------------------
491 #----------------------------------------------------------------------
492
492
493 def add_job(self, idx):
493 def add_job(self, idx):
494 """Called after self.targets[idx] just got the job with header.
494 """Called after self.targets[idx] just got the job with header.
495 Override with subclasses. The default ordering is simple LRU.
495 Override with subclasses. The default ordering is simple LRU.
496 The default loads are the number of outstanding jobs."""
496 The default loads are the number of outstanding jobs."""
497 self.loads[idx] += 1
497 self.loads[idx] += 1
498 for lis in (self.targets, self.loads):
498 for lis in (self.targets, self.loads):
499 lis.append(lis.pop(idx))
499 lis.append(lis.pop(idx))
500
500
501
501
502 def finish_job(self, idx):
502 def finish_job(self, idx):
503 """Called after self.targets[idx] just finished a job.
503 """Called after self.targets[idx] just finished a job.
504 Override with subclasses."""
504 Override with subclasses."""
505 self.loads[idx] -= 1
505 self.loads[idx] -= 1
506
506
507
507
508
508
509 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
509 def launch_scheduler(in_addr, out_addr, mon_addr, not_addr, config=None,logname='ZMQ', log_addr=None, loglevel=logging.DEBUG, scheme='weighted'):
510 from zmq.eventloop import ioloop
510 from zmq.eventloop import ioloop
511 from zmq.eventloop.zmqstream import ZMQStream
511 from zmq.eventloop.zmqstream import ZMQStream
512
512
513 ctx = zmq.Context()
513 ctx = zmq.Context()
514 loop = ioloop.IOLoop()
514 loop = ioloop.IOLoop()
515
515 print (in_addr, out_addr, mon_addr, not_addr)
516 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
516 ins = ZMQStream(ctx.socket(zmq.XREP),loop)
517 ins.bind(in_addr)
517 ins.bind(in_addr)
518 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
518 outs = ZMQStream(ctx.socket(zmq.XREP),loop)
519 outs.bind(out_addr)
519 outs.bind(out_addr)
520 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
520 mons = ZMQStream(ctx.socket(zmq.PUB),loop)
521 mons.connect(mon_addr)
521 mons.connect(mon_addr)
522 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
522 nots = ZMQStream(ctx.socket(zmq.SUB),loop)
523 nots.setsockopt(zmq.SUBSCRIBE, '')
523 nots.setsockopt(zmq.SUBSCRIBE, '')
524 nots.connect(not_addr)
524 nots.connect(not_addr)
525
525
526 scheme = globals().get(scheme, None)
526 scheme = globals().get(scheme, None)
527 # setup logging
527 # setup logging
528 if log_addr:
528 if log_addr:
529 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
529 connect_logger(logname, ctx, log_addr, root="scheduler", loglevel=loglevel)
530 else:
530 else:
531 local_logger(logname, loglevel)
531 local_logger(logname, loglevel)
532
532
533 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
533 scheduler = TaskScheduler(client_stream=ins, engine_stream=outs,
534 mon_stream=mons, notifier_stream=nots,
534 mon_stream=mons, notifier_stream=nots,
535 scheme=scheme, loop=loop, logname=logname)
535 scheme=scheme, loop=loop, logname=logname,
536 config=config)
536 scheduler.start()
537 scheduler.start()
537 try:
538 try:
538 loop.start()
539 loop.start()
539 except KeyboardInterrupt:
540 except KeyboardInterrupt:
540 print ("interrupted, exiting...", file=sys.__stderr__)
541 print ("interrupted, exiting...", file=sys.__stderr__)
541
542
542
543 if __name__ == '__main__':
544 iface = 'tcp://127.0.0.1:%i'
545 launch_scheduler(iface%12345,iface%1236,iface%12347,iface%12348)
@@ -1,119 +1,119 b''
1 """some generic utilities"""
1 """some generic utilities"""
2 import re
2 import re
3 import socket
3 import socket
4
4
5 class ReverseDict(dict):
5 class ReverseDict(dict):
6 """simple double-keyed subset of dict methods."""
6 """simple double-keyed subset of dict methods."""
7
7
8 def __init__(self, *args, **kwargs):
8 def __init__(self, *args, **kwargs):
9 dict.__init__(self, *args, **kwargs)
9 dict.__init__(self, *args, **kwargs)
10 self._reverse = dict()
10 self._reverse = dict()
11 for key, value in self.iteritems():
11 for key, value in self.iteritems():
12 self._reverse[value] = key
12 self._reverse[value] = key
13
13
14 def __getitem__(self, key):
14 def __getitem__(self, key):
15 try:
15 try:
16 return dict.__getitem__(self, key)
16 return dict.__getitem__(self, key)
17 except KeyError:
17 except KeyError:
18 return self._reverse[key]
18 return self._reverse[key]
19
19
20 def __setitem__(self, key, value):
20 def __setitem__(self, key, value):
21 if key in self._reverse:
21 if key in self._reverse:
22 raise KeyError("Can't have key %r on both sides!"%key)
22 raise KeyError("Can't have key %r on both sides!"%key)
23 dict.__setitem__(self, key, value)
23 dict.__setitem__(self, key, value)
24 self._reverse[value] = key
24 self._reverse[value] = key
25
25
26 def pop(self, key):
26 def pop(self, key):
27 value = dict.pop(self, key)
27 value = dict.pop(self, key)
28 self._reverse.pop(value)
28 self._reverse.pop(value)
29 return value
29 return value
30
30
31 def get(self, key, default=None):
31 def get(self, key, default=None):
32 try:
32 try:
33 return self[key]
33 return self[key]
34 except KeyError:
34 except KeyError:
35 return default
35 return default
36
36
37
37
38 def validate_url(url):
38 def validate_url(url):
39 """validate a url for zeromq"""
39 """validate a url for zeromq"""
40 if not isinstance(url, basestring):
40 if not isinstance(url, basestring):
41 raise TypeError("url must be a string, not %r"%type(url))
41 raise TypeError("url must be a string, not %r"%type(url))
42 url = url.lower()
42 url = url.lower()
43
43
44 proto_addr = url.split('://')
44 proto_addr = url.split('://')
45 assert len(proto_addr) == 2, 'Invalid url: %r'%url
45 assert len(proto_addr) == 2, 'Invalid url: %r'%url
46 proto, addr = proto_addr
46 proto, addr = proto_addr
47 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
47 assert proto in ['tcp','pgm','epgm','ipc','inproc'], "Invalid protocol: %r"%proto
48
48
49 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
49 # domain pattern adapted from http://www.regexlib.com/REDetails.aspx?regexp_id=391
50 # author: Remi Sabourin
50 # author: Remi Sabourin
51 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
51 pat = re.compile(r'^([\w\d]([\w\d\-]{0,61}[\w\d])?\.)*[\w\d]([\w\d\-]{0,61}[\w\d])?$')
52
52
53 if proto == 'tcp':
53 if proto == 'tcp':
54 lis = addr.split(':')
54 lis = addr.split(':')
55 assert len(lis) == 2, 'Invalid url: %r'%url
55 assert len(lis) == 2, 'Invalid url: %r'%url
56 addr,s_port = lis
56 addr,s_port = lis
57 try:
57 try:
58 port = int(s_port)
58 port = int(s_port)
59 except ValueError:
59 except ValueError:
60 raise AssertionError("Invalid port %r in url: %r"%(port, url))
60 raise AssertionError("Invalid port %r in url: %r"%(port, url))
61
61
62 assert pat.match(addr) is not None, 'Invalid url: %r'%url
62 assert pat.match(addr) is not None, 'Invalid url: %r'%url
63
63
64 else:
64 else:
65 # only validate tcp urls currently
65 # only validate tcp urls currently
66 pass
66 pass
67
67
68 return True
68 return True
69
69
70
70
71 def validate_url_container(container):
71 def validate_url_container(container):
72 """validate a potentially nested collection of urls."""
72 """validate a potentially nested collection of urls."""
73 if isinstance(container, basestring):
73 if isinstance(container, basestring):
74 url = container
74 url = container
75 return validate_url(url)
75 return validate_url(url)
76 elif isinstance(container, dict):
76 elif isinstance(container, dict):
77 container = container.itervalues()
77 container = container.itervalues()
78
78
79 for element in container:
79 for element in container:
80 validate_url_container(element)
80 validate_url_container(element)
81
81
82
82
83 def split_url(url):
83 def split_url(url):
84 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
84 """split a zmq url (tcp://ip:port) into ('tcp','ip','port')."""
85 proto_addr = url.split('://')
85 proto_addr = url.split('://')
86 assert len(proto_addr) == 2, 'Invalid url: %r'%url
86 assert len(proto_addr) == 2, 'Invalid url: %r'%url
87 proto, addr = proto_addr
87 proto, addr = proto_addr
88 lis = addr.split(':')
88 lis = addr.split(':')
89 assert len(lis) == 2, 'Invalid url: %r'%url
89 assert len(lis) == 2, 'Invalid url: %r'%url
90 addr,s_port = lis
90 addr,s_port = lis
91 return proto,addr,s_port
91 return proto,addr,s_port
92
92
93 def disambiguate_ip_address(ip, location=None):
93 def disambiguate_ip_address(ip, location=None):
94 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
94 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
95 ones, based on the location (default interpretation of location is localhost)."""
95 ones, based on the location (default interpretation of location is localhost)."""
96 if ip in ('0.0.0.0', '*'):
96 if ip in ('0.0.0.0', '*'):
97 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
97 external_ips = socket.gethostbyname_ex(socket.gethostname())[2]
98 if location is None or location in external_ips:
98 if location is None or location in external_ips:
99 ip='127.0.0.1'
99 ip='127.0.0.1'
100 elif external_ips:
100 elif location:
101 ip=external_ips[0]
101 return location
102 return ip
102 return ip
103
103
104 def disambiguate_url(url, location=None):
104 def disambiguate_url(url, location=None):
105 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
105 """turn multi-ip interfaces '0.0.0.0' and '*' into connectable
106 ones, based on the location (default interpretation is localhost).
106 ones, based on the location (default interpretation is localhost).
107
107
108 This is for zeromq urls, such as tcp://*:10101."""
108 This is for zeromq urls, such as tcp://*:10101."""
109 try:
109 try:
110 proto,ip,port = split_url(url)
110 proto,ip,port = split_url(url)
111 except AssertionError:
111 except AssertionError:
112 # probably not tcp url; could be ipc, etc.
112 # probably not tcp url; could be ipc, etc.
113 return url
113 return url
114
114
115 ip = disambiguate_ip_address(ip,location)
115 ip = disambiguate_ip_address(ip,location)
116
116
117 return "%s://%s:%s"%(proto,ip,port)
117 return "%s://%s:%s"%(proto,ip,port)
118
118
119
119
@@ -1,355 +1,355 b''
1 """Views of remote engines"""
1 """Views of remote engines"""
2 #-----------------------------------------------------------------------------
2 #-----------------------------------------------------------------------------
3 # Copyright (C) 2010 The IPython Development Team
3 # Copyright (C) 2010 The IPython Development Team
4 #
4 #
5 # Distributed under the terms of the BSD License. The full license is in
5 # Distributed under the terms of the BSD License. The full license is in
6 # the file COPYING, distributed as part of this software.
6 # the file COPYING, distributed as part of this software.
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8
8
9 #-----------------------------------------------------------------------------
9 #-----------------------------------------------------------------------------
10 # Imports
10 # Imports
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 from IPython.external.decorator import decorator
13 from IPython.external.decorator import decorator
14 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
14 from IPython.zmq.parallel.remotefunction import ParallelFunction, parallel
15
15
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17 # Decorators
17 # Decorators
18 #-----------------------------------------------------------------------------
18 #-----------------------------------------------------------------------------
19
19
20 @decorator
20 @decorator
21 def myblock(f, self, *args, **kwargs):
21 def myblock(f, self, *args, **kwargs):
22 """override client.block with self.block during a call"""
22 """override client.block with self.block during a call"""
23 block = self.client.block
23 block = self.client.block
24 self.client.block = self.block
24 self.client.block = self.block
25 try:
25 try:
26 ret = f(self, *args, **kwargs)
26 ret = f(self, *args, **kwargs)
27 finally:
27 finally:
28 self.client.block = block
28 self.client.block = block
29 return ret
29 return ret
30
30
31 @decorator
31 @decorator
32 def save_ids(f, self, *args, **kwargs):
32 def save_ids(f, self, *args, **kwargs):
33 """Keep our history and outstanding attributes up to date after a method call."""
33 """Keep our history and outstanding attributes up to date after a method call."""
34 n_previous = len(self.client.history)
34 n_previous = len(self.client.history)
35 ret = f(self, *args, **kwargs)
35 ret = f(self, *args, **kwargs)
36 nmsgs = len(self.client.history) - n_previous
36 nmsgs = len(self.client.history) - n_previous
37 msg_ids = self.client.history[-nmsgs:]
37 msg_ids = self.client.history[-nmsgs:]
38 self.history.extend(msg_ids)
38 self.history.extend(msg_ids)
39 map(self.outstanding.add, msg_ids)
39 map(self.outstanding.add, msg_ids)
40 return ret
40 return ret
41
41
42 @decorator
42 @decorator
43 def sync_results(f, self, *args, **kwargs):
43 def sync_results(f, self, *args, **kwargs):
44 """sync relevant results from self.client to our results attribute."""
44 """sync relevant results from self.client to our results attribute."""
45 ret = f(self, *args, **kwargs)
45 ret = f(self, *args, **kwargs)
46 delta = self.outstanding.difference(self.client.outstanding)
46 delta = self.outstanding.difference(self.client.outstanding)
47 completed = self.outstanding.intersection(delta)
47 completed = self.outstanding.intersection(delta)
48 self.outstanding = self.outstanding.difference(completed)
48 self.outstanding = self.outstanding.difference(completed)
49 for msg_id in completed:
49 for msg_id in completed:
50 self.results[msg_id] = self.client.results[msg_id]
50 self.results[msg_id] = self.client.results[msg_id]
51 return ret
51 return ret
52
52
53 @decorator
53 @decorator
54 def spin_after(f, self, *args, **kwargs):
54 def spin_after(f, self, *args, **kwargs):
55 """call spin after the method."""
55 """call spin after the method."""
56 ret = f(self, *args, **kwargs)
56 ret = f(self, *args, **kwargs)
57 self.spin()
57 self.spin()
58 return ret
58 return ret
59
59
60 #-----------------------------------------------------------------------------
60 #-----------------------------------------------------------------------------
61 # Classes
61 # Classes
62 #-----------------------------------------------------------------------------
62 #-----------------------------------------------------------------------------
63
63
64 class View(object):
64 class View(object):
65 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
65 """Base View class for more convenint apply(f,*args,**kwargs) syntax via attributes.
66
66
67 Don't use this class, use subclasses.
67 Don't use this class, use subclasses.
68 """
68 """
69 _targets = None
69 _targets = None
70 block=None
70 block=None
71 bound=None
71 bound=None
72 history=None
72 history=None
73
73
74 def __init__(self, client, targets=None):
74 def __init__(self, client, targets=None):
75 self.client = client
75 self.client = client
76 self._targets = targets
76 self._targets = targets
77 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
77 self._ntargets = 1 if isinstance(targets, (int,type(None))) else len(targets)
78 self.block = client.block
78 self.block = client.block
79 self.bound=False
79 self.bound=False
80 self.history = []
80 self.history = []
81 self.outstanding = set()
81 self.outstanding = set()
82 self.results = {}
82 self.results = {}
83
83
84 def __repr__(self):
84 def __repr__(self):
85 strtargets = str(self._targets)
85 strtargets = str(self._targets)
86 if len(strtargets) > 16:
86 if len(strtargets) > 16:
87 strtargets = strtargets[:12]+'...]'
87 strtargets = strtargets[:12]+'...]'
88 return "<%s %s>"%(self.__class__.__name__, strtargets)
88 return "<%s %s>"%(self.__class__.__name__, strtargets)
89
89
90 @property
90 @property
91 def targets(self):
91 def targets(self):
92 return self._targets
92 return self._targets
93
93
94 @targets.setter
94 @targets.setter
95 def targets(self, value):
95 def targets(self, value):
96 self._targets = value
96 self._targets = value
97 # raise AttributeError("Cannot set my targets argument after construction!")
97 # raise AttributeError("Cannot set my targets argument after construction!")
98
98
99 @sync_results
99 @sync_results
100 def spin(self):
100 def spin(self):
101 """spin the client, and sync"""
101 """spin the client, and sync"""
102 self.client.spin()
102 self.client.spin()
103
103
104 @sync_results
104 @sync_results
105 @save_ids
105 @save_ids
106 def apply(self, f, *args, **kwargs):
106 def apply(self, f, *args, **kwargs):
107 """calls f(*args, **kwargs) on remote engines, returning the result.
107 """calls f(*args, **kwargs) on remote engines, returning the result.
108
108
109 This method does not involve the engine's namespace.
109 This method does not involve the engine's namespace.
110
110
111 if self.block is False:
111 if self.block is False:
112 returns msg_id
112 returns msg_id
113 else:
113 else:
114 returns actual result of f(*args, **kwargs)
114 returns actual result of f(*args, **kwargs)
115 """
115 """
116 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
116 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=self.bound)
117
117
118 @save_ids
118 @save_ids
119 def apply_async(self, f, *args, **kwargs):
119 def apply_async(self, f, *args, **kwargs):
120 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
120 """calls f(*args, **kwargs) on remote engines in a nonblocking manner.
121
121
122 This method does not involve the engine's namespace.
122 This method does not involve the engine's namespace.
123
123
124 returns msg_id
124 returns msg_id
125 """
125 """
126 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
126 return self.client.apply(f,args,kwargs, block=False, targets=self.targets, bound=False)
127
127
128 @spin_after
128 @spin_after
129 @save_ids
129 @save_ids
130 def apply_sync(self, f, *args, **kwargs):
130 def apply_sync(self, f, *args, **kwargs):
131 """calls f(*args, **kwargs) on remote engines in a blocking manner,
131 """calls f(*args, **kwargs) on remote engines in a blocking manner,
132 returning the result.
132 returning the result.
133
133
134 This method does not involve the engine's namespace.
134 This method does not involve the engine's namespace.
135
135
136 returns: actual result of f(*args, **kwargs)
136 returns: actual result of f(*args, **kwargs)
137 """
137 """
138 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
138 return self.client.apply(f,args,kwargs, block=True, targets=self.targets, bound=False)
139
139
140 @sync_results
140 @sync_results
141 @save_ids
141 @save_ids
142 def apply_bound(self, f, *args, **kwargs):
142 def apply_bound(self, f, *args, **kwargs):
143 """calls f(*args, **kwargs) bound to engine namespace(s).
143 """calls f(*args, **kwargs) bound to engine namespace(s).
144
144
145 if self.block is False:
145 if self.block is False:
146 returns msg_id
146 returns msg_id
147 else:
147 else:
148 returns actual result of f(*args, **kwargs)
148 returns actual result of f(*args, **kwargs)
149
149
150 This method has access to the targets' globals
150 This method has access to the targets' globals
151
151
152 """
152 """
153 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
153 return self.client.apply(f, args, kwargs, block=self.block, targets=self.targets, bound=True)
154
154
155 @sync_results
155 @sync_results
156 @save_ids
156 @save_ids
157 def apply_async_bound(self, f, *args, **kwargs):
157 def apply_async_bound(self, f, *args, **kwargs):
158 """calls f(*args, **kwargs) bound to engine namespace(s)
158 """calls f(*args, **kwargs) bound to engine namespace(s)
159 in a nonblocking manner.
159 in a nonblocking manner.
160
160
161 returns: msg_id
161 returns: msg_id
162
162
163 This method has access to the targets' globals
163 This method has access to the targets' globals
164
164
165 """
165 """
166 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
166 return self.client.apply(f, args, kwargs, block=False, targets=self.targets, bound=True)
167
167
168 @spin_after
168 @spin_after
169 @save_ids
169 @save_ids
170 def apply_sync_bound(self, f, *args, **kwargs):
170 def apply_sync_bound(self, f, *args, **kwargs):
171 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
171 """calls f(*args, **kwargs) bound to engine namespace(s), waiting for the result.
172
172
173 returns: actual result of f(*args, **kwargs)
173 returns: actual result of f(*args, **kwargs)
174
174
175 This method has access to the targets' globals
175 This method has access to the targets' globals
176
176
177 """
177 """
178 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
178 return self.client.apply(f, args, kwargs, block=True, targets=self.targets, bound=True)
179
179
180 @spin_after
180 @spin_after
181 @save_ids
181 @save_ids
182 def map(self, f, *sequences):
182 def map(self, f, *sequences):
183 """Parallel version of builtin `map`, using this view's engines."""
183 """Parallel version of builtin `map`, using this view's engines."""
184 if isinstance(self.targets, int):
184 if isinstance(self.targets, int):
185 targets = [self.targets]
185 targets = [self.targets]
186 else:
186 else:
187 targets = self.targets
187 targets = self.targets
188 pf = ParallelFunction(self.client, f, block=self.block,
188 pf = ParallelFunction(self.client, f, block=self.block,
189 bound=True, targets=targets)
189 bound=True, targets=targets)
190 return pf.map(*sequences)
190 return pf.map(*sequences)
191
191
192 def parallel(self, bound=True, block=True):
192 def parallel(self, bound=True, block=True):
193 """Decorator for making a ParallelFunction"""
193 """Decorator for making a ParallelFunction"""
194 return parallel(self.client, bound=bound, targets=self.targets, block=block)
194 return parallel(self.client, bound=bound, targets=self.targets, block=block)
195
195
196 def abort(self, msg_ids=None, block=None):
196 def abort(self, msg_ids=None, block=None):
197 """Abort jobs on my engines.
197 """Abort jobs on my engines.
198
198
199 Parameters
199 Parameters
200 ----------
200 ----------
201
201
202 msg_ids : None, str, list of strs, optional
202 msg_ids : None, str, list of strs, optional
203 if None: abort all jobs.
203 if None: abort all jobs.
204 else: abort specific msg_id(s).
204 else: abort specific msg_id(s).
205 """
205 """
206 block = block if block is not None else self.block
206 block = block if block is not None else self.block
207 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
207 return self.client.abort(msg_ids=msg_ids, targets=self.targets, block=block)
208
208
209 def queue_status(self, verbose=False):
209 def queue_status(self, verbose=False):
210 """Fetch the Queue status of my engines"""
210 """Fetch the Queue status of my engines"""
211 return self.client.queue_status(targets=self.targets, verbose=verbose)
211 return self.client.queue_status(targets=self.targets, verbose=verbose)
212
212
213 def purge_results(self, msg_ids=[], targets=[]):
213 def purge_results(self, msg_ids=[], targets=[]):
214 """Instruct the controller to forget specific results."""
214 """Instruct the controller to forget specific results."""
215 if targets is None or targets == 'all':
215 if targets is None or targets == 'all':
216 targets = self.targets
216 targets = self.targets
217 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
217 return self.client.purge_results(msg_ids=msg_ids, targets=targets)
218
218
219
219
220
220
221 class DirectView(View):
221 class DirectView(View):
222 """Direct Multiplexer View of one or more engines.
222 """Direct Multiplexer View of one or more engines.
223
223
224 These are created via indexed access to a client:
224 These are created via indexed access to a client:
225
225
226 >>> dv_1 = client[1]
226 >>> dv_1 = client[1]
227 >>> dv_all = client[:]
227 >>> dv_all = client[:]
228 >>> dv_even = client[::2]
228 >>> dv_even = client[::2]
229 >>> dv_some = client[1:3]
229 >>> dv_some = client[1:3]
230
230
231 This object provides dictionary access
231 This object provides dictionary access
232
232
233 """
233 """
234
234
235 @sync_results
235 @sync_results
236 @save_ids
236 @save_ids
237 def execute(self, code, block=True):
237 def execute(self, code, block=True):
238 """execute some code on my targets."""
238 """execute some code on my targets."""
239 return self.client.execute(code, block=self.block, targets=self.targets)
239 return self.client.execute(code, block=self.block, targets=self.targets)
240
240
241 def update(self, ns):
241 def update(self, ns):
242 """update remote namespace with dict `ns`"""
242 """update remote namespace with dict `ns`"""
243 return self.client.push(ns, targets=self.targets, block=self.block)
243 return self.client.push(ns, targets=self.targets, block=self.block)
244
244
245 push = update
245 push = update
246
246
247 def get(self, key_s):
247 def get(self, key_s):
248 """get object(s) by `key_s` from remote namespace
248 """get object(s) by `key_s` from remote namespace
249 will return one object if it is a key.
249 will return one object if it is a key.
250 It also takes a list of keys, and will return a list of objects."""
250 It also takes a list of keys, and will return a list of objects."""
251 # block = block if block is not None else self.block
251 # block = block if block is not None else self.block
252 return self.client.pull(key_s, block=True, targets=self.targets)
252 return self.client.pull(key_s, block=True, targets=self.targets)
253
253
254 @sync_results
254 @sync_results
255 @save_ids
255 @save_ids
256 def pull(self, key_s, block=True):
256 def pull(self, key_s, block=True):
257 """get object(s) by `key_s` from remote namespace
257 """get object(s) by `key_s` from remote namespace
258 will return one object if it is a key.
258 will return one object if it is a key.
259 It also takes a list of keys, and will return a list of objects."""
259 It also takes a list of keys, and will return a list of objects."""
260 block = block if block is not None else self.block
260 block = block if block is not None else self.block
261 return self.client.pull(key_s, block=block, targets=self.targets)
261 return self.client.pull(key_s, block=block, targets=self.targets)
262
262
263 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
263 def scatter(self, key, seq, dist='b', flatten=False, targets=None, block=None):
264 """
264 """
265 Partition a Python sequence and send the partitions to a set of engines.
265 Partition a Python sequence and send the partitions to a set of engines.
266 """
266 """
267 block = block if block is not None else self.block
267 block = block if block is not None else self.block
268 targets = targets if targets is not None else self.targets
268 targets = targets if targets is not None else self.targets
269
269
270 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
270 return self.client.scatter(key, seq, dist=dist, flatten=flatten,
271 targets=targets, block=block)
271 targets=targets, block=block)
272
272
273 @sync_results
273 @sync_results
274 @save_ids
274 @save_ids
275 def gather(self, key, dist='b', targets=None, block=None):
275 def gather(self, key, dist='b', targets=None, block=None):
276 """
276 """
277 Gather a partitioned sequence on a set of engines as a single local seq.
277 Gather a partitioned sequence on a set of engines as a single local seq.
278 """
278 """
279 block = block if block is not None else self.block
279 block = block if block is not None else self.block
280 targets = targets if targets is not None else self.targets
280 targets = targets if targets is not None else self.targets
281
281
282 return self.client.gather(key, dist=dist, targets=targets, block=block)
282 return self.client.gather(key, dist=dist, targets=targets, block=block)
283
283
284 def __getitem__(self, key):
284 def __getitem__(self, key):
285 return self.get(key)
285 return self.get(key)
286
286
287 def __setitem__(self,key, value):
287 def __setitem__(self,key, value):
288 self.update({key:value})
288 self.update({key:value})
289
289
290 def clear(self, block=False):
290 def clear(self, block=False):
291 """Clear the remote namespaces on my engines."""
291 """Clear the remote namespaces on my engines."""
292 block = block if block is not None else self.block
292 block = block if block is not None else self.block
293 return self.client.clear(targets=self.targets, block=block)
293 return self.client.clear(targets=self.targets, block=block)
294
294
295 def kill(self, block=True):
295 def kill(self, block=True):
296 """Kill my engines."""
296 """Kill my engines."""
297 block = block if block is not None else self.block
297 block = block if block is not None else self.block
298 return self.client.kill(targets=self.targets, block=block)
298 return self.client.kill(targets=self.targets, block=block)
299
299
300 #----------------------------------------
300 #----------------------------------------
301 # activate for %px,%autopx magics
301 # activate for %px,%autopx magics
302 #----------------------------------------
302 #----------------------------------------
303 def activate(self):
303 def activate(self):
304 """Make this `View` active for parallel magic commands.
304 """Make this `View` active for parallel magic commands.
305
305
306 IPython has a magic command syntax to work with `MultiEngineClient` objects.
306 IPython has a magic command syntax to work with `MultiEngineClient` objects.
307 In a given IPython session there is a single active one. While
307 In a given IPython session there is a single active one. While
308 there can be many `Views` created and used by the user,
308 there can be many `Views` created and used by the user,
309 there is only one active one. The active `View` is used whenever
309 there is only one active one. The active `View` is used whenever
310 the magic commands %px and %autopx are used.
310 the magic commands %px and %autopx are used.
311
311
312 The activate() method is called on a given `View` to make it
312 The activate() method is called on a given `View` to make it
313 active. Once this has been done, the magic commands can be used.
313 active. Once this has been done, the magic commands can be used.
314 """
314 """
315
315
316 try:
316 try:
317 # This is injected into __builtins__.
317 # This is injected into __builtins__.
318 ip = get_ipython()
318 ip = get_ipython()
319 except NameError:
319 except NameError:
320 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
320 print "The IPython parallel magics (%result, %px, %autopx) only work within IPython."
321 else:
321 else:
322 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
322 pmagic = ip.plugin_manager.get_plugin('parallelmagic')
323 if pmagic is not None:
323 if pmagic is not None:
324 pmagic.active_multiengine_client = self
324 pmagic.active_multiengine_client = self
325 else:
325 else:
326 print "You must first load the parallelmagic extension " \
326 print "You must first load the parallelmagic extension " \
327 "by doing '%load_ext parallelmagic'"
327 "by doing '%load_ext parallelmagic'"
328
328
329
329
330 class LoadBalancedView(View):
330 class LoadBalancedView(View):
331 """An engine-agnostic View that only executes via the Task queue.
331 """An engine-agnostic View that only executes via the Task queue.
332
332
333 Typically created via:
333 Typically created via:
334
334
335 >>> lbv = client[None]
335 >>> lbv = client[None]
336 <LoadBalancedView tcp://127.0.0.1:12345>
336 <LoadBalancedView tcp://127.0.0.1:12345>
337
337
338 but can also be created with:
338 but can also be created with:
339
339
340 >>> lbc = LoadBalancedView(client)
340 >>> lbc = LoadBalancedView(client)
341
341
342 TODO: allow subset of engines across which to balance.
342 TODO: allow subset of engines across which to balance.
343 """
343 """
344 def __repr__(self):
344 def __repr__(self):
345 return "<%s %s>"%(self.__class__.__name__, self.client._addr)
345 return "<%s %s>"%(self.__class__.__name__, self.client._config['url'])
346
346
347 @property
347 @property
348 def targets(self):
348 def targets(self):
349 return None
349 return None
350
350
351 @targets.setter
351 @targets.setter
352 def targets(self, value):
352 def targets(self, value):
353 raise AttributeError("Cannot set targets for LoadbalancedView!")
353 raise AttributeError("Cannot set targets for LoadbalancedView!")
354
354
355 No newline at end of file
355
@@ -1,290 +1,290 b''
1 =================
1 =================
2 Parallel examples
2 Parallel examples
3 =================
3 =================
4
4
5 .. note::
5 .. note::
6
6
7 Performance numbers from ``IPython.kernel``, not newparallel
7 Performance numbers from ``IPython.kernel``, not newparallel.
8
8
9 In this section we describe two more involved examples of using an IPython
9 In this section we describe two more involved examples of using an IPython
10 cluster to perform a parallel computation. In these examples, we will be using
10 cluster to perform a parallel computation. In these examples, we will be using
11 IPython's "pylab" mode, which enables interactive plotting using the
11 IPython's "pylab" mode, which enables interactive plotting using the
12 Matplotlib package. IPython can be started in this mode by typing::
12 Matplotlib package. IPython can be started in this mode by typing::
13
13
14 ipython --pylab
14 ipython --pylab
15
15
16 at the system command line. If this prints an error message, you will
16 at the system command line. If this prints an error message, you will
17 need to install the default profiles from within IPython by doing,
17 need to install the default profiles from within IPython by doing,
18
18
19 .. sourcecode:: ipython
19 .. sourcecode:: ipython
20
20
21 In [1]: %install_profiles
21 In [1]: %install_profiles
22
22
23 and then restarting IPython.
23 and then restarting IPython.
24
24
25 150 million digits of pi
25 150 million digits of pi
26 ========================
26 ========================
27
27
28 In this example we would like to study the distribution of digits in the
28 In this example we would like to study the distribution of digits in the
29 number pi (in base 10). While it is not known if pi is a normal number (a
29 number pi (in base 10). While it is not known if pi is a normal number (a
30 number is normal in base 10 if 0-9 occur with equal likelihood) numerical
30 number is normal in base 10 if 0-9 occur with equal likelihood) numerical
31 investigations suggest that it is. We will begin with a serial calculation on
31 investigations suggest that it is. We will begin with a serial calculation on
32 10,000 digits of pi and then perform a parallel calculation involving 150
32 10,000 digits of pi and then perform a parallel calculation involving 150
33 million digits.
33 million digits.
34
34
35 In both the serial and parallel calculation we will be using functions defined
35 In both the serial and parallel calculation we will be using functions defined
36 in the :file:`pidigits.py` file, which is available in the
36 in the :file:`pidigits.py` file, which is available in the
37 :file:`docs/examples/kernel` directory of the IPython source distribution.
37 :file:`docs/examples/newparallel` directory of the IPython source distribution.
38 These functions provide basic facilities for working with the digits of pi and
38 These functions provide basic facilities for working with the digits of pi and
39 can be loaded into IPython by putting :file:`pidigits.py` in your current
39 can be loaded into IPython by putting :file:`pidigits.py` in your current
40 working directory and then doing:
40 working directory and then doing:
41
41
42 .. sourcecode:: ipython
42 .. sourcecode:: ipython
43
43
44 In [1]: run pidigits.py
44 In [1]: run pidigits.py
45
45
46 Serial calculation
46 Serial calculation
47 ------------------
47 ------------------
48
48
49 For the serial calculation, we will use SymPy (http://www.sympy.org) to
49 For the serial calculation, we will use `SymPy <http://www.sympy.org>`_ to
50 calculate 10,000 digits of pi and then look at the frequencies of the digits
50 calculate 10,000 digits of pi and then look at the frequencies of the digits
51 0-9. Out of 10,000 digits, we expect each digit to occur 1,000 times. While
51 0-9. Out of 10,000 digits, we expect each digit to occur 1,000 times. While
52 SymPy is capable of calculating many more digits of pi, our purpose here is to
52 SymPy is capable of calculating many more digits of pi, our purpose here is to
53 set the stage for the much larger parallel calculation.
53 set the stage for the much larger parallel calculation.
54
54
55 In this example, we use two functions from :file:`pidigits.py`:
55 In this example, we use two functions from :file:`pidigits.py`:
56 :func:`one_digit_freqs` (which calculates how many times each digit occurs)
56 :func:`one_digit_freqs` (which calculates how many times each digit occurs)
57 and :func:`plot_one_digit_freqs` (which uses Matplotlib to plot the result).
57 and :func:`plot_one_digit_freqs` (which uses Matplotlib to plot the result).
58 Here is an interactive IPython session that uses these functions with
58 Here is an interactive IPython session that uses these functions with
59 SymPy:
59 SymPy:
60
60
61 .. sourcecode:: ipython
61 .. sourcecode:: ipython
62
62
63 In [7]: import sympy
63 In [7]: import sympy
64
64
65 In [8]: pi = sympy.pi.evalf(40)
65 In [8]: pi = sympy.pi.evalf(40)
66
66
67 In [9]: pi
67 In [9]: pi
68 Out[9]: 3.141592653589793238462643383279502884197
68 Out[9]: 3.141592653589793238462643383279502884197
69
69
70 In [10]: pi = sympy.pi.evalf(10000)
70 In [10]: pi = sympy.pi.evalf(10000)
71
71
72 In [11]: digits = (d for d in str(pi)[2:]) # create a sequence of digits
72 In [11]: digits = (d for d in str(pi)[2:]) # create a sequence of digits
73
73
74 In [12]: run pidigits.py # load one_digit_freqs/plot_one_digit_freqs
74 In [12]: run pidigits.py # load one_digit_freqs/plot_one_digit_freqs
75
75
76 In [13]: freqs = one_digit_freqs(digits)
76 In [13]: freqs = one_digit_freqs(digits)
77
77
78 In [14]: plot_one_digit_freqs(freqs)
78 In [14]: plot_one_digit_freqs(freqs)
79 Out[14]: [<matplotlib.lines.Line2D object at 0x18a55290>]
79 Out[14]: [<matplotlib.lines.Line2D object at 0x18a55290>]
80
80
81 The resulting plot of the single digit counts shows that each digit occurs
81 The resulting plot of the single digit counts shows that each digit occurs
82 approximately 1,000 times, but that with only 10,000 digits the
82 approximately 1,000 times, but that with only 10,000 digits the
83 statistical fluctuations are still rather large:
83 statistical fluctuations are still rather large:
84
84
85 .. image:: ../parallel/single_digits.*
85 .. image:: ../parallel/single_digits.*
86
86
87 It is clear that to reduce the relative fluctuations in the counts, we need
87 It is clear that to reduce the relative fluctuations in the counts, we need
88 to look at many more digits of pi. That brings us to the parallel calculation.
88 to look at many more digits of pi. That brings us to the parallel calculation.
89
89
90 Parallel calculation
90 Parallel calculation
91 --------------------
91 --------------------
92
92
93 Calculating many digits of pi is a challenging computational problem in itself.
93 Calculating many digits of pi is a challenging computational problem in itself.
94 Because we want to focus on the distribution of digits in this example, we
94 Because we want to focus on the distribution of digits in this example, we
95 will use pre-computed digit of pi from the website of Professor Yasumasa
95 will use pre-computed digit of pi from the website of Professor Yasumasa
96 Kanada at the University of Tokyo (http://www.super-computing.org). These
96 Kanada at the University of Tokyo (http://www.super-computing.org). These
97 digits come in a set of text files (ftp://pi.super-computing.org/.2/pi200m/)
97 digits come in a set of text files (ftp://pi.super-computing.org/.2/pi200m/)
98 that each have 10 million digits of pi.
98 that each have 10 million digits of pi.
99
99
100 For the parallel calculation, we have copied these files to the local hard
100 For the parallel calculation, we have copied these files to the local hard
101 drives of the compute nodes. A total of 15 of these files will be used, for a
101 drives of the compute nodes. A total of 15 of these files will be used, for a
102 total of 150 million digits of pi. To make things a little more interesting we
102 total of 150 million digits of pi. To make things a little more interesting we
103 will calculate the frequencies of all 2 digits sequences (00-99) and then plot
103 will calculate the frequencies of all 2 digits sequences (00-99) and then plot
104 the result using a 2D matrix in Matplotlib.
104 the result using a 2D matrix in Matplotlib.
105
105
106 The overall idea of the calculation is simple: each IPython engine will
106 The overall idea of the calculation is simple: each IPython engine will
107 compute the two digit counts for the digits in a single file. Then in a final
107 compute the two digit counts for the digits in a single file. Then in a final
108 step the counts from each engine will be added up. To perform this
108 step the counts from each engine will be added up. To perform this
109 calculation, we will need two top-level functions from :file:`pidigits.py`:
109 calculation, we will need two top-level functions from :file:`pidigits.py`:
110
110
111 .. literalinclude:: ../../examples/newparallel/pidigits.py
111 .. literalinclude:: ../../examples/newparallel/pidigits.py
112 :language: python
112 :language: python
113 :lines: 34-49
113 :lines: 41-56
114
114
115 We will also use the :func:`plot_two_digit_freqs` function to plot the
115 We will also use the :func:`plot_two_digit_freqs` function to plot the
116 results. The code to run this calculation in parallel is contained in
116 results. The code to run this calculation in parallel is contained in
117 :file:`docs/examples/newparallel/parallelpi.py`. This code can be run in parallel
117 :file:`docs/examples/newparallel/parallelpi.py`. This code can be run in parallel
118 using IPython by following these steps:
118 using IPython by following these steps:
119
119
120 1. Use :command:`ipclusterz` to start 15 engines. We used an 8 core (2 quad
120 1. Use :command:`ipclusterz` to start 15 engines. We used an 8 core (2 quad
121 core CPUs) cluster with hyperthreading enabled which makes the 8 cores
121 core CPUs) cluster with hyperthreading enabled which makes the 8 cores
122 looks like 16 (1 controller + 15 engines) in the OS. However, the maximum
122 looks like 16 (1 controller + 15 engines) in the OS. However, the maximum
123 speedup we can observe is still only 8x.
123 speedup we can observe is still only 8x.
124 2. With the file :file:`parallelpi.py` in your current working directory, open
124 2. With the file :file:`parallelpi.py` in your current working directory, open
125 up IPython in pylab mode and type ``run parallelpi.py``. This will download
125 up IPython in pylab mode and type ``run parallelpi.py``. This will download
126 the pi files via ftp the first time you run it, if they are not
126 the pi files via ftp the first time you run it, if they are not
127 present in the Engines' working directory.
127 present in the Engines' working directory.
128
128
129 When run on our 8 core cluster, we observe a speedup of 7.7x. This is slightly
129 When run on our 8 core cluster, we observe a speedup of 7.7x. This is slightly
130 less than linear scaling (8x) because the controller is also running on one of
130 less than linear scaling (8x) because the controller is also running on one of
131 the cores.
131 the cores.
132
132
133 To emphasize the interactive nature of IPython, we now show how the
133 To emphasize the interactive nature of IPython, we now show how the
134 calculation can also be run by simply typing the commands from
134 calculation can also be run by simply typing the commands from
135 :file:`parallelpi.py` interactively into IPython:
135 :file:`parallelpi.py` interactively into IPython:
136
136
137 .. sourcecode:: ipython
137 .. sourcecode:: ipython
138
138
139 In [1]: from IPython.zmq.parallel import client
139 In [1]: from IPython.zmq.parallel import client
140
140
141 # The Client allows us to use the engines interactively.
141 # The Client allows us to use the engines interactively.
142 # We simply pass Client the name of the cluster profile we
142 # We simply pass Client the name of the cluster profile we
143 # are using.
143 # are using.
144 In [2]: c = client.Client(profile='mycluster')
144 In [2]: c = client.Client(profile='mycluster')
145
145
146 In [3]: c.ids
146 In [3]: c.ids
147 Out[3]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
147 Out[3]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14]
148
148
149 In [4]: run pidigits.py
149 In [4]: run pidigits.py
150
150
151 In [5]: filestring = 'pi200m.ascii.%(i)02dof20'
151 In [5]: filestring = 'pi200m.ascii.%(i)02dof20'
152
152
153 # Create the list of files to process.
153 # Create the list of files to process.
154 In [6]: files = [filestring % {'i':i} for i in range(1,16)]
154 In [6]: files = [filestring % {'i':i} for i in range(1,16)]
155
155
156 In [7]: files
156 In [7]: files
157 Out[7]:
157 Out[7]:
158 ['pi200m.ascii.01of20',
158 ['pi200m.ascii.01of20',
159 'pi200m.ascii.02of20',
159 'pi200m.ascii.02of20',
160 'pi200m.ascii.03of20',
160 'pi200m.ascii.03of20',
161 'pi200m.ascii.04of20',
161 'pi200m.ascii.04of20',
162 'pi200m.ascii.05of20',
162 'pi200m.ascii.05of20',
163 'pi200m.ascii.06of20',
163 'pi200m.ascii.06of20',
164 'pi200m.ascii.07of20',
164 'pi200m.ascii.07of20',
165 'pi200m.ascii.08of20',
165 'pi200m.ascii.08of20',
166 'pi200m.ascii.09of20',
166 'pi200m.ascii.09of20',
167 'pi200m.ascii.10of20',
167 'pi200m.ascii.10of20',
168 'pi200m.ascii.11of20',
168 'pi200m.ascii.11of20',
169 'pi200m.ascii.12of20',
169 'pi200m.ascii.12of20',
170 'pi200m.ascii.13of20',
170 'pi200m.ascii.13of20',
171 'pi200m.ascii.14of20',
171 'pi200m.ascii.14of20',
172 'pi200m.ascii.15of20']
172 'pi200m.ascii.15of20']
173
173
174 # download the data files if they don't already exist:
174 # download the data files if they don't already exist:
175 In [8]: c.map(fetch_pi_file, files)
175 In [8]: c.map(fetch_pi_file, files)
176
176
177 # This is the parallel calculation using the Client.map method
177 # This is the parallel calculation using the Client.map method
178 # which applies compute_two_digit_freqs to each file in files in parallel.
178 # which applies compute_two_digit_freqs to each file in files in parallel.
179 In [9]: freqs_all = c.map(compute_two_digit_freqs, files)
179 In [9]: freqs_all = c.map(compute_two_digit_freqs, files)
180
180
181 # Add up the frequencies from each engine.
181 # Add up the frequencies from each engine.
182 In [10]: freqs = reduce_freqs(freqs_all)
182 In [10]: freqs = reduce_freqs(freqs_all)
183
183
184 In [11]: plot_two_digit_freqs(freqs)
184 In [11]: plot_two_digit_freqs(freqs)
185 Out[11]: <matplotlib.image.AxesImage object at 0x18beb110>
185 Out[11]: <matplotlib.image.AxesImage object at 0x18beb110>
186
186
187 In [12]: plt.title('2 digit counts of 150m digits of pi')
187 In [12]: plt.title('2 digit counts of 150m digits of pi')
188 Out[12]: <matplotlib.text.Text object at 0x18d1f9b0>
188 Out[12]: <matplotlib.text.Text object at 0x18d1f9b0>
189
189
190 The resulting plot generated by Matplotlib is shown below. The colors indicate
190 The resulting plot generated by Matplotlib is shown below. The colors indicate
191 which two digit sequences are more (red) or less (blue) likely to occur in the
191 which two digit sequences are more (red) or less (blue) likely to occur in the
192 first 150 million digits of pi. We clearly see that the sequence "41" is
192 first 150 million digits of pi. We clearly see that the sequence "41" is
193 most likely and that "06" and "07" are least likely. Further analysis would
193 most likely and that "06" and "07" are least likely. Further analysis would
194 show that the relative size of the statistical fluctuations have decreased
194 show that the relative size of the statistical fluctuations have decreased
195 compared to the 10,000 digit calculation.
195 compared to the 10,000 digit calculation.
196
196
197 .. image:: ../parallel/two_digit_counts.*
197 .. image:: ../parallel/two_digit_counts.*
198
198
199
199
200 Parallel options pricing
200 Parallel options pricing
201 ========================
201 ========================
202
202
203 An option is a financial contract that gives the buyer of the contract the
203 An option is a financial contract that gives the buyer of the contract the
204 right to buy (a "call") or sell (a "put") a secondary asset (a stock for
204 right to buy (a "call") or sell (a "put") a secondary asset (a stock for
205 example) at a particular date in the future (the expiration date) for a
205 example) at a particular date in the future (the expiration date) for a
206 pre-agreed upon price (the strike price). For this right, the buyer pays the
206 pre-agreed upon price (the strike price). For this right, the buyer pays the
207 seller a premium (the option price). There are a wide variety of flavors of
207 seller a premium (the option price). There are a wide variety of flavors of
208 options (American, European, Asian, etc.) that are useful for different
208 options (American, European, Asian, etc.) that are useful for different
209 purposes: hedging against risk, speculation, etc.
209 purposes: hedging against risk, speculation, etc.
210
210
211 Much of modern finance is driven by the need to price these contracts
211 Much of modern finance is driven by the need to price these contracts
212 accurately based on what is known about the properties (such as volatility) of
212 accurately based on what is known about the properties (such as volatility) of
213 the underlying asset. One method of pricing options is to use a Monte Carlo
213 the underlying asset. One method of pricing options is to use a Monte Carlo
214 simulation of the underlying asset price. In this example we use this approach
214 simulation of the underlying asset price. In this example we use this approach
215 to price both European and Asian (path dependent) options for various strike
215 to price both European and Asian (path dependent) options for various strike
216 prices and volatilities.
216 prices and volatilities.
217
217
218 The code for this example can be found in the :file:`docs/examples/kernel`
218 The code for this example can be found in the :file:`docs/examples/newparallel`
219 directory of the IPython source. The function :func:`price_options` in
219 directory of the IPython source. The function :func:`price_options` in
220 :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using
220 :file:`mcpricer.py` implements the basic Monte Carlo pricing algorithm using
221 the NumPy package and is shown here:
221 the NumPy package and is shown here:
222
222
223 .. literalinclude:: ../../examples/kernel/mcpricer.py
223 .. literalinclude:: ../../examples/newparallel/mcpricer.py
224 :language: python
224 :language: python
225
225
226 To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class,
226 To run this code in parallel, we will use IPython's :class:`LoadBalancedView` class,
227 which distributes work to the engines using dynamic load balancing. This
227 which distributes work to the engines using dynamic load balancing. This
228 view is a wrapper of the :class:`Client` class shown in
228 view is a wrapper of the :class:`Client` class shown in
229 the previous example. The parallel calculation using :class:`LoadBalancedView` can
229 the previous example. The parallel calculation using :class:`LoadBalancedView` can
230 be found in the file :file:`mcpricer.py`. The code in this file creates a
230 be found in the file :file:`mcpricer.py`. The code in this file creates a
231 :class:`TaskClient` instance and then submits a set of tasks using
231 :class:`TaskClient` instance and then submits a set of tasks using
232 :meth:`TaskClient.run` that calculate the option prices for different
232 :meth:`TaskClient.run` that calculate the option prices for different
233 volatilities and strike prices. The results are then plotted as a 2D contour
233 volatilities and strike prices. The results are then plotted as a 2D contour
234 plot using Matplotlib.
234 plot using Matplotlib.
235
235
236 .. literalinclude:: ../../examples/kernel/mcdriver.py
236 .. literalinclude:: ../../examples/newparallel/mcdriver.py
237 :language: python
237 :language: python
238
238
239 To use this code, start an IPython cluster using :command:`ipclusterz`, open
239 To use this code, start an IPython cluster using :command:`ipclusterz`, open
240 IPython in the pylab mode with the file :file:`mcdriver.py` in your current
240 IPython in the pylab mode with the file :file:`mcdriver.py` in your current
241 working directory and then type:
241 working directory and then type:
242
242
243 .. sourcecode:: ipython
243 .. sourcecode:: ipython
244
244
245 In [7]: run mcdriver.py
245 In [7]: run mcdriver.py
246 Submitted tasks: [0, 1, 2, ...]
246 Submitted tasks: [0, 1, 2, ...]
247
247
248 Once all the tasks have finished, the results can be plotted using the
248 Once all the tasks have finished, the results can be plotted using the
249 :func:`plot_options` function. Here we make contour plots of the Asian
249 :func:`plot_options` function. Here we make contour plots of the Asian
250 call and Asian put options as function of the volatility and strike price:
250 call and Asian put options as function of the volatility and strike price:
251
251
252 .. sourcecode:: ipython
252 .. sourcecode:: ipython
253
253
254 In [8]: plot_options(sigma_vals, K_vals, prices['acall'])
254 In [8]: plot_options(sigma_vals, K_vals, prices['acall'])
255
255
256 In [9]: plt.figure()
256 In [9]: plt.figure()
257 Out[9]: <matplotlib.figure.Figure object at 0x18c178d0>
257 Out[9]: <matplotlib.figure.Figure object at 0x18c178d0>
258
258
259 In [10]: plot_options(sigma_vals, K_vals, prices['aput'])
259 In [10]: plot_options(sigma_vals, K_vals, prices['aput'])
260
260
261 These results are shown in the two figures below. On a 8 core cluster the
261 These results are shown in the two figures below. On a 8 core cluster the
262 entire calculation (10 strike prices, 10 volatilities, 100,000 paths for each)
262 entire calculation (10 strike prices, 10 volatilities, 100,000 paths for each)
263 took 30 seconds in parallel, giving a speedup of 7.7x, which is comparable
263 took 30 seconds in parallel, giving a speedup of 7.7x, which is comparable
264 to the speedup observed in our previous example.
264 to the speedup observed in our previous example.
265
265
266 .. image:: ../parallel/asian_call.*
266 .. image:: ../parallel/asian_call.*
267
267
268 .. image:: ../parallel/asian_put.*
268 .. image:: ../parallel/asian_put.*
269
269
270 Conclusion
270 Conclusion
271 ==========
271 ==========
272
272
273 To conclude these examples, we summarize the key features of IPython's
273 To conclude these examples, we summarize the key features of IPython's
274 parallel architecture that have been demonstrated:
274 parallel architecture that have been demonstrated:
275
275
276 * Serial code can be parallelized often with only a few extra lines of code.
276 * Serial code can be parallelized often with only a few extra lines of code.
277 We have used the :class:`DirectView` and :class:`LoadBalancedView` classes
277 We have used the :class:`DirectView` and :class:`LoadBalancedView` classes
278 for this purpose.
278 for this purpose.
279 * The resulting parallel code can be run without ever leaving the IPython's
279 * The resulting parallel code can be run without ever leaving the IPython's
280 interactive shell.
280 interactive shell.
281 * Any data computed in parallel can be explored interactively through
281 * Any data computed in parallel can be explored interactively through
282 visualization or further numerical calculations.
282 visualization or further numerical calculations.
283 * We have run these examples on a cluster running Windows HPC Server 2008.
283 * We have run these examples on a cluster running Windows HPC Server 2008.
284 IPython's built in support for the Windows HPC job scheduler makes it
284 IPython's built in support for the Windows HPC job scheduler makes it
285 easy to get started with IPython's parallel capabilities.
285 easy to get started with IPython's parallel capabilities.
286
286
287 .. note::
287 .. note::
288
288
289 The newparallel code has never been run on Windows HPC Server, so the last
289 The newparallel code has never been run on Windows HPC Server, so the last
290 conclusion is untested.
290 conclusion is untested.
General Comments 0
You need to be logged in to leave comments. Login now