##// END OF EJS Templates
Minor improvements to the parallel computing stuff....
bgranger -
Show More
@@ -1,766 +1,773 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Facilities for handling client connections to the controller."""
4 """Facilities for handling client connections to the controller."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 from __future__ import with_statement
17 from __future__ import with_statement
18 import os
18 import os
19
19
20 from IPython.kernel.fcutil import (
20 from IPython.kernel.fcutil import (
21 Tub,
21 Tub,
22 find_furl,
22 find_furl,
23 is_valid_furl_or_file,
23 is_valid_furl_or_file,
24 validate_furl_or_file,
24 validate_furl_or_file,
25 FURLError
25 FURLError
26 )
26 )
27 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
27 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
28 from IPython.kernel.launcher import IPClusterLauncher
28 from IPython.kernel.launcher import IPClusterLauncher
29 from IPython.kernel.twistedutil import (
29 from IPython.kernel.twistedutil import (
30 gatherBoth,
30 gatherBoth,
31 make_deferred,
31 make_deferred,
32 blockingCallFromThread,
32 blockingCallFromThread,
33 sleep_deferred
33 sleep_deferred
34 )
34 )
35 from IPython.utils.importstring import import_item
35 from IPython.utils.importstring import import_item
36 from IPython.utils.genutils import get_ipython_dir
36 from IPython.utils.genutils import get_ipython_dir
37
37
38 from twisted.internet import defer
38 from twisted.internet import defer
39 from twisted.internet.defer import inlineCallbacks, returnValue
39 from twisted.internet.defer import inlineCallbacks, returnValue
40 from twisted.python import failure, log
40 from twisted.python import failure, log
41
41
42 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
43 # The ClientConnector class
43 # The ClientConnector class
44 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
45
45
46 DELAY = 0.2
46 DELAY = 0.2
47 MAX_TRIES = 9
47 MAX_TRIES = 9
48
48
49
49
50 class ClientConnectorError(Exception):
50 class ClientConnectorError(Exception):
51 pass
51 pass
52
52
53
53
54 class AsyncClientConnector(object):
54 class AsyncClientConnector(object):
55 """A class for getting remote references and clients from furls.
55 """A class for getting remote references and clients from furls.
56
56
57 This start a single :class:`Tub` for all remote reference and caches
57 This start a single :class:`Tub` for all remote reference and caches
58 references.
58 references.
59 """
59 """
60
60
61 def __init__(self):
61 def __init__(self):
62 self._remote_refs = {}
62 self._remote_refs = {}
63 self.tub = Tub()
63 self.tub = Tub()
64 self.tub.startService()
64 self.tub.startService()
65
65
66 def _find_furl(self, profile='default', cluster_dir=None,
66 def _find_furl(self, profile='default', cluster_dir=None,
67 furl_or_file=None, furl_file_name=None,
67 furl_or_file=None, furl_file_name=None,
68 ipython_dir=None):
68 ipython_dir=None):
69 """Find a FURL file by profile+ipython_dir or cluster dir.
69 """Find a FURL file by profile+ipython_dir or cluster dir.
70
70
71 This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception
71 This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception
72 if a FURL file can't be found.
72 if a FURL file can't be found.
73 """
73 """
74 # Try by furl_or_file
74 # Try by furl_or_file
75 if furl_or_file is not None:
75 if furl_or_file is not None:
76 validate_furl_or_file(furl_or_file)
76 validate_furl_or_file(furl_or_file)
77 return furl_or_file
77 return furl_or_file
78
78
79 if furl_file_name is None:
79 if furl_file_name is None:
80 raise FURLError('A furl_file_name must be provided')
80 raise FURLError('A furl_file_name must be provided')
81
81
82 # Try by cluster_dir
82 # Try by cluster_dir
83 if cluster_dir is not None:
83 if cluster_dir is not None:
84 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
84 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
85 sdir = cluster_dir_obj.security_dir
85 sdir = cluster_dir_obj.security_dir
86 furl_file = os.path.join(sdir, furl_file_name)
86 furl_file = os.path.join(sdir, furl_file_name)
87 validate_furl_or_file(furl_file)
87 validate_furl_or_file(furl_file)
88 return furl_file
88 return furl_file
89
89
90 # Try by profile
90 # Try by profile
91 if ipython_dir is None:
91 if ipython_dir is None:
92 ipython_dir = get_ipython_dir()
92 ipython_dir = get_ipython_dir()
93 if profile is not None:
93 if profile is not None:
94 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
94 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
95 ipython_dir, profile)
95 ipython_dir, profile)
96 sdir = cluster_dir_obj.security_dir
96 sdir = cluster_dir_obj.security_dir
97 furl_file = os.path.join(sdir, furl_file_name)
97 furl_file = os.path.join(sdir, furl_file_name)
98 validate_furl_or_file(furl_file)
98 validate_furl_or_file(furl_file)
99 return furl_file
99 return furl_file
100
100
101 raise FURLError('Could not find a valid FURL file.')
101 raise FURLError('Could not find a valid FURL file.')
102
102
103 def get_reference(self, furl_or_file):
103 def get_reference(self, furl_or_file):
104 """Get a remote reference using a furl or a file containing a furl.
104 """Get a remote reference using a furl or a file containing a furl.
105
105
106 Remote references are cached locally so once a remote reference
106 Remote references are cached locally so once a remote reference
107 has been retrieved for a given furl, the cached version is
107 has been retrieved for a given furl, the cached version is
108 returned.
108 returned.
109
109
110 Parameters
110 Parameters
111 ----------
111 ----------
112 furl_or_file : str
112 furl_or_file : str
113 A furl or a filename containing a furl. This should already be
113 A furl or a filename containing a furl. This should already be
114 validated, but might not yet exist.
114 validated, but might not yet exist.
115
115
116 Returns
116 Returns
117 -------
117 -------
118 A deferred to a remote reference
118 A deferred to a remote reference
119 """
119 """
120 furl = furl_or_file
120 furl = furl_or_file
121 if furl in self._remote_refs:
121 if furl in self._remote_refs:
122 d = defer.succeed(self._remote_refs[furl])
122 d = defer.succeed(self._remote_refs[furl])
123 else:
123 else:
124 d = self.tub.getReference(furl)
124 d = self.tub.getReference(furl)
125 d.addCallback(self._save_ref, furl)
125 d.addCallback(self._save_ref, furl)
126 return d
126 return d
127
127
128 def _save_ref(self, ref, furl):
128 def _save_ref(self, ref, furl):
129 """Cache a remote reference by its furl."""
129 """Cache a remote reference by its furl."""
130 self._remote_refs[furl] = ref
130 self._remote_refs[furl] = ref
131 return ref
131 return ref
132
132
133 def get_task_client(self, profile='default', cluster_dir=None,
133 def get_task_client(self, profile='default', cluster_dir=None,
134 furl_or_file=None, ipython_dir=None,
134 furl_or_file=None, ipython_dir=None,
135 delay=DELAY, max_tries=MAX_TRIES):
135 delay=DELAY, max_tries=MAX_TRIES):
136 """Get the task controller client.
136 """Get the task controller client.
137
137
138 This method is a simple wrapper around `get_client` that passes in
138 This method is a simple wrapper around `get_client` that passes in
139 the default name of the task client FURL file. Usually only
139 the default name of the task client FURL file. Usually only
140 the ``profile`` option will be needed. If a FURL file can't be
140 the ``profile`` option will be needed. If a FURL file can't be
141 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
141 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
142
142
143 Parameters
143 Parameters
144 ----------
144 ----------
145 profile : str
145 profile : str
146 The name of a cluster directory profile (default="default"). The
146 The name of a cluster directory profile (default="default"). The
147 cluster directory "cluster_<profile>" will be searched for
147 cluster directory "cluster_<profile>" will be searched for
148 in ``os.getcwd()``, the ipython_dir and then in the directories
148 in ``os.getcwd()``, the ipython_dir and then in the directories
149 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
149 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
150 cluster_dir : str
150 cluster_dir : str
151 The full path to a cluster directory. This is useful if profiles
151 The full path to a cluster directory. This is useful if profiles
152 are not being used.
152 are not being used.
153 furl_or_file : str
153 furl_or_file : str
154 A furl or a filename containing a FURLK. This is useful if you
154 A furl or a filename containing a FURLK. This is useful if you
155 simply know the location of the FURL file.
155 simply know the location of the FURL file.
156 ipython_dir : str
156 ipython_dir : str
157 The location of the ipython_dir if different from the default.
157 The location of the ipython_dir if different from the default.
158 This is used if the cluster directory is being found by profile.
158 This is used if the cluster directory is being found by profile.
159 delay : float
159 delay : float
160 The initial delay between re-connection attempts. Susequent delays
160 The initial delay between re-connection attempts. Susequent delays
161 get longer according to ``delay[i] = 1.5*delay[i-1]``.
161 get longer according to ``delay[i] = 1.5*delay[i-1]``.
162 max_tries : int
162 max_tries : int
163 The max number of re-connection attempts.
163 The max number of re-connection attempts.
164
164
165 Returns
165 Returns
166 -------
166 -------
167 A deferred to the actual client class.
167 A deferred to the actual client class.
168 """
168 """
169 return self.get_client(
169 return self.get_client(
170 profile, cluster_dir, furl_or_file,
170 profile, cluster_dir, furl_or_file,
171 'ipcontroller-tc.furl', ipython_dir,
171 'ipcontroller-tc.furl', ipython_dir,
172 delay, max_tries
172 delay, max_tries
173 )
173 )
174
174
175 def get_multiengine_client(self, profile='default', cluster_dir=None,
175 def get_multiengine_client(self, profile='default', cluster_dir=None,
176 furl_or_file=None, ipython_dir=None,
176 furl_or_file=None, ipython_dir=None,
177 delay=DELAY, max_tries=MAX_TRIES):
177 delay=DELAY, max_tries=MAX_TRIES):
178 """Get the multiengine controller client.
178 """Get the multiengine controller client.
179
179
180 This method is a simple wrapper around `get_client` that passes in
180 This method is a simple wrapper around `get_client` that passes in
181 the default name of the task client FURL file. Usually only
181 the default name of the task client FURL file. Usually only
182 the ``profile`` option will be needed. If a FURL file can't be
182 the ``profile`` option will be needed. If a FURL file can't be
183 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
183 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
184
184
185 Parameters
185 Parameters
186 ----------
186 ----------
187 profile : str
187 profile : str
188 The name of a cluster directory profile (default="default"). The
188 The name of a cluster directory profile (default="default"). The
189 cluster directory "cluster_<profile>" will be searched for
189 cluster directory "cluster_<profile>" will be searched for
190 in ``os.getcwd()``, the ipython_dir and then in the directories
190 in ``os.getcwd()``, the ipython_dir and then in the directories
191 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
191 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
192 cluster_dir : str
192 cluster_dir : str
193 The full path to a cluster directory. This is useful if profiles
193 The full path to a cluster directory. This is useful if profiles
194 are not being used.
194 are not being used.
195 furl_or_file : str
195 furl_or_file : str
196 A furl or a filename containing a FURLK. This is useful if you
196 A furl or a filename containing a FURLK. This is useful if you
197 simply know the location of the FURL file.
197 simply know the location of the FURL file.
198 ipython_dir : str
198 ipython_dir : str
199 The location of the ipython_dir if different from the default.
199 The location of the ipython_dir if different from the default.
200 This is used if the cluster directory is being found by profile.
200 This is used if the cluster directory is being found by profile.
201 delay : float
201 delay : float
202 The initial delay between re-connection attempts. Susequent delays
202 The initial delay between re-connection attempts. Susequent delays
203 get longer according to ``delay[i] = 1.5*delay[i-1]``.
203 get longer according to ``delay[i] = 1.5*delay[i-1]``.
204 max_tries : int
204 max_tries : int
205 The max number of re-connection attempts.
205 The max number of re-connection attempts.
206
206
207 Returns
207 Returns
208 -------
208 -------
209 A deferred to the actual client class.
209 A deferred to the actual client class.
210 """
210 """
211 return self.get_client(
211 return self.get_client(
212 profile, cluster_dir, furl_or_file,
212 profile, cluster_dir, furl_or_file,
213 'ipcontroller-mec.furl', ipython_dir,
213 'ipcontroller-mec.furl', ipython_dir,
214 delay, max_tries
214 delay, max_tries
215 )
215 )
216
216
217 def get_client(self, profile='default', cluster_dir=None,
217 def get_client(self, profile='default', cluster_dir=None,
218 furl_or_file=None, furl_file_name=None, ipython_dir=None,
218 furl_or_file=None, furl_file_name=None, ipython_dir=None,
219 delay=DELAY, max_tries=MAX_TRIES):
219 delay=DELAY, max_tries=MAX_TRIES):
220 """Get a remote reference and wrap it in a client by furl.
220 """Get a remote reference and wrap it in a client by furl.
221
221
222 This method is a simple wrapper around `get_client` that passes in
222 This method is a simple wrapper around `get_client` that passes in
223 the default name of the task client FURL file. Usually only
223 the default name of the task client FURL file. Usually only
224 the ``profile`` option will be needed. If a FURL file can't be
224 the ``profile`` option will be needed. If a FURL file can't be
225 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
225 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
226
226
227 Parameters
227 Parameters
228 ----------
228 ----------
229 profile : str
229 profile : str
230 The name of a cluster directory profile (default="default"). The
230 The name of a cluster directory profile (default="default"). The
231 cluster directory "cluster_<profile>" will be searched for
231 cluster directory "cluster_<profile>" will be searched for
232 in ``os.getcwd()``, the ipython_dir and then in the directories
232 in ``os.getcwd()``, the ipython_dir and then in the directories
233 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
233 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
234 cluster_dir : str
234 cluster_dir : str
235 The full path to a cluster directory. This is useful if profiles
235 The full path to a cluster directory. This is useful if profiles
236 are not being used.
236 are not being used.
237 furl_or_file : str
237 furl_or_file : str
238 A furl or a filename containing a FURL. This is useful if you
238 A furl or a filename containing a FURL. This is useful if you
239 simply know the location of the FURL file.
239 simply know the location of the FURL file.
240 furl_file_name : str
240 furl_file_name : str
241 The filename (not the full path) of the FURL. This must be
241 The filename (not the full path) of the FURL. This must be
242 provided if ``furl_or_file`` is not.
242 provided if ``furl_or_file`` is not.
243 ipython_dir : str
243 ipython_dir : str
244 The location of the ipython_dir if different from the default.
244 The location of the ipython_dir if different from the default.
245 This is used if the cluster directory is being found by profile.
245 This is used if the cluster directory is being found by profile.
246 delay : float
246 delay : float
247 The initial delay between re-connection attempts. Susequent delays
247 The initial delay between re-connection attempts. Susequent delays
248 get longer according to ``delay[i] = 1.5*delay[i-1]``.
248 get longer according to ``delay[i] = 1.5*delay[i-1]``.
249 max_tries : int
249 max_tries : int
250 The max number of re-connection attempts.
250 The max number of re-connection attempts.
251
251
252 Returns
252 Returns
253 -------
253 -------
254 A deferred to the actual client class. Or a failure to a
254 A deferred to the actual client class. Or a failure to a
255 :exc:`FURLError`.
255 :exc:`FURLError`.
256 """
256 """
257 try:
257 try:
258 furl_file = self._find_furl(
258 furl_file = self._find_furl(
259 profile, cluster_dir, furl_or_file,
259 profile, cluster_dir, furl_or_file,
260 furl_file_name, ipython_dir
260 furl_file_name, ipython_dir
261 )
261 )
262 except FURLError:
262 except FURLError:
263 return defer.fail(failure.Failure())
263 return defer.fail(failure.Failure())
264
264
265 def _wrap_remote_reference(rr):
265 def _wrap_remote_reference(rr):
266 d = rr.callRemote('get_client_name')
266 d = rr.callRemote('get_client_name')
267 d.addCallback(lambda name: import_item(name))
267 d.addCallback(lambda name: import_item(name))
268 def adapt(client_interface):
268 def adapt(client_interface):
269 client = client_interface(rr)
269 client = client_interface(rr)
270 client.tub = self.tub
270 client.tub = self.tub
271 return client
271 return client
272 d.addCallback(adapt)
272 d.addCallback(adapt)
273
273
274 return d
274 return d
275
275
276 d = self._try_to_connect(furl_file, delay, max_tries, attempt=0)
276 d = self._try_to_connect(furl_file, delay, max_tries, attempt=0)
277 d.addCallback(_wrap_remote_reference)
277 d.addCallback(_wrap_remote_reference)
278 d.addErrback(self._handle_error, furl_file)
278 return d
279 return d
279
280
281 def _handle_error(self, f, furl_file):
282 raise ClientConnectorError('Could not connect to the controller '
283 'using the FURL file. This usually means that i) the controller '
284 'was not started or ii) a firewall was blocking the client from '
285 'connecting to the controller: %s' % furl_file)
286
280 @inlineCallbacks
287 @inlineCallbacks
281 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
288 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
282 """Try to connect to the controller with retry logic."""
289 """Try to connect to the controller with retry logic."""
283 if attempt < max_tries:
290 if attempt < max_tries:
284 log.msg("Connecting to controller [%r]: %s" % \
291 log.msg("Connecting [%r]" % attempt)
285 (attempt, furl_or_file))
286 try:
292 try:
287 self.furl = find_furl(furl_or_file)
293 self.furl = find_furl(furl_or_file)
288 # Uncomment this to see the FURL being tried.
294 # Uncomment this to see the FURL being tried.
289 # log.msg("FURL: %s" % self.furl)
295 # log.msg("FURL: %s" % self.furl)
290 rr = yield self.get_reference(self.furl)
296 rr = yield self.get_reference(self.furl)
297 log.msg("Connected: %s" % furl_or_file)
291 except:
298 except:
292 if attempt==max_tries-1:
299 if attempt==max_tries-1:
293 # This will propagate the exception all the way to the top
300 # This will propagate the exception all the way to the top
294 # where it can be handled.
301 # where it can be handled.
295 raise
302 raise
296 else:
303 else:
297 yield sleep_deferred(delay)
304 yield sleep_deferred(delay)
298 rr = yield self._try_to_connect(
305 rr = yield self._try_to_connect(
299 furl_or_file, 1.5*delay, max_tries, attempt+1
306 furl_or_file, 1.5*delay, max_tries, attempt+1
300 )
307 )
301 returnValue(rr)
308 returnValue(rr)
302 else:
309 else:
303 returnValue(rr)
310 returnValue(rr)
304 else:
311 else:
305 raise ClientConnectorError(
312 raise ClientConnectorError(
306 'Could not connect to controller, max_tries (%r) exceeded. '
313 'Could not connect to controller, max_tries (%r) exceeded. '
307 'This usually means that i) the controller was not started, '
314 'This usually means that i) the controller was not started, '
308 'or ii) a firewall was blocking the client from connecting '
315 'or ii) a firewall was blocking the client from connecting '
309 'to the controller.' % max_tries
316 'to the controller.' % max_tries
310 )
317 )
311
318
312
319
313 class ClientConnector(object):
320 class ClientConnector(object):
314 """A blocking version of a client connector.
321 """A blocking version of a client connector.
315
322
316 This class creates a single :class:`Tub` instance and allows remote
323 This class creates a single :class:`Tub` instance and allows remote
317 references and client to be retrieved by their FURLs. Remote references
324 references and client to be retrieved by their FURLs. Remote references
318 are cached locally and FURL files can be found using profiles and cluster
325 are cached locally and FURL files can be found using profiles and cluster
319 directories.
326 directories.
320 """
327 """
321
328
322 def __init__(self):
329 def __init__(self):
323 self.async_cc = AsyncClientConnector()
330 self.async_cc = AsyncClientConnector()
324
331
325 def get_task_client(self, profile='default', cluster_dir=None,
332 def get_task_client(self, profile='default', cluster_dir=None,
326 furl_or_file=None, ipython_dir=None,
333 furl_or_file=None, ipython_dir=None,
327 delay=DELAY, max_tries=MAX_TRIES):
334 delay=DELAY, max_tries=MAX_TRIES):
328 """Get the task client.
335 """Get the task client.
329
336
330 Usually only the ``profile`` option will be needed. If a FURL file
337 Usually only the ``profile`` option will be needed. If a FURL file
331 can't be found by its profile, use ``cluster_dir`` or
338 can't be found by its profile, use ``cluster_dir`` or
332 ``furl_or_file``.
339 ``furl_or_file``.
333
340
334 Parameters
341 Parameters
335 ----------
342 ----------
336 profile : str
343 profile : str
337 The name of a cluster directory profile (default="default"). The
344 The name of a cluster directory profile (default="default"). The
338 cluster directory "cluster_<profile>" will be searched for
345 cluster directory "cluster_<profile>" will be searched for
339 in ``os.getcwd()``, the ipython_dir and then in the directories
346 in ``os.getcwd()``, the ipython_dir and then in the directories
340 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
347 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
341 cluster_dir : str
348 cluster_dir : str
342 The full path to a cluster directory. This is useful if profiles
349 The full path to a cluster directory. This is useful if profiles
343 are not being used.
350 are not being used.
344 furl_or_file : str
351 furl_or_file : str
345 A furl or a filename containing a FURLK. This is useful if you
352 A furl or a filename containing a FURLK. This is useful if you
346 simply know the location of the FURL file.
353 simply know the location of the FURL file.
347 ipython_dir : str
354 ipython_dir : str
348 The location of the ipython_dir if different from the default.
355 The location of the ipython_dir if different from the default.
349 This is used if the cluster directory is being found by profile.
356 This is used if the cluster directory is being found by profile.
350 delay : float
357 delay : float
351 The initial delay between re-connection attempts. Susequent delays
358 The initial delay between re-connection attempts. Susequent delays
352 get longer according to ``delay[i] = 1.5*delay[i-1]``.
359 get longer according to ``delay[i] = 1.5*delay[i-1]``.
353 max_tries : int
360 max_tries : int
354 The max number of re-connection attempts.
361 The max number of re-connection attempts.
355
362
356 Returns
363 Returns
357 -------
364 -------
358 The task client instance.
365 The task client instance.
359 """
366 """
360 client = blockingCallFromThread(
367 client = blockingCallFromThread(
361 self.async_cc.get_task_client, profile, cluster_dir,
368 self.async_cc.get_task_client, profile, cluster_dir,
362 furl_or_file, ipython_dir, delay, max_tries
369 furl_or_file, ipython_dir, delay, max_tries
363 )
370 )
364 return client.adapt_to_blocking_client()
371 return client.adapt_to_blocking_client()
365
372
366 def get_multiengine_client(self, profile='default', cluster_dir=None,
373 def get_multiengine_client(self, profile='default', cluster_dir=None,
367 furl_or_file=None, ipython_dir=None,
374 furl_or_file=None, ipython_dir=None,
368 delay=DELAY, max_tries=MAX_TRIES):
375 delay=DELAY, max_tries=MAX_TRIES):
369 """Get the multiengine client.
376 """Get the multiengine client.
370
377
371 Usually only the ``profile`` option will be needed. If a FURL file
378 Usually only the ``profile`` option will be needed. If a FURL file
372 can't be found by its profile, use ``cluster_dir`` or
379 can't be found by its profile, use ``cluster_dir`` or
373 ``furl_or_file``.
380 ``furl_or_file``.
374
381
375 Parameters
382 Parameters
376 ----------
383 ----------
377 profile : str
384 profile : str
378 The name of a cluster directory profile (default="default"). The
385 The name of a cluster directory profile (default="default"). The
379 cluster directory "cluster_<profile>" will be searched for
386 cluster directory "cluster_<profile>" will be searched for
380 in ``os.getcwd()``, the ipython_dir and then in the directories
387 in ``os.getcwd()``, the ipython_dir and then in the directories
381 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
388 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
382 cluster_dir : str
389 cluster_dir : str
383 The full path to a cluster directory. This is useful if profiles
390 The full path to a cluster directory. This is useful if profiles
384 are not being used.
391 are not being used.
385 furl_or_file : str
392 furl_or_file : str
386 A furl or a filename containing a FURLK. This is useful if you
393 A furl or a filename containing a FURLK. This is useful if you
387 simply know the location of the FURL file.
394 simply know the location of the FURL file.
388 ipython_dir : str
395 ipython_dir : str
389 The location of the ipython_dir if different from the default.
396 The location of the ipython_dir if different from the default.
390 This is used if the cluster directory is being found by profile.
397 This is used if the cluster directory is being found by profile.
391 delay : float
398 delay : float
392 The initial delay between re-connection attempts. Susequent delays
399 The initial delay between re-connection attempts. Susequent delays
393 get longer according to ``delay[i] = 1.5*delay[i-1]``.
400 get longer according to ``delay[i] = 1.5*delay[i-1]``.
394 max_tries : int
401 max_tries : int
395 The max number of re-connection attempts.
402 The max number of re-connection attempts.
396
403
397 Returns
404 Returns
398 -------
405 -------
399 The multiengine client instance.
406 The multiengine client instance.
400 """
407 """
401 client = blockingCallFromThread(
408 client = blockingCallFromThread(
402 self.async_cc.get_multiengine_client, profile, cluster_dir,
409 self.async_cc.get_multiengine_client, profile, cluster_dir,
403 furl_or_file, ipython_dir, delay, max_tries
410 furl_or_file, ipython_dir, delay, max_tries
404 )
411 )
405 return client.adapt_to_blocking_client()
412 return client.adapt_to_blocking_client()
406
413
407 def get_client(self, profile='default', cluster_dir=None,
414 def get_client(self, profile='default', cluster_dir=None,
408 furl_or_file=None, ipython_dir=None,
415 furl_or_file=None, ipython_dir=None,
409 delay=DELAY, max_tries=MAX_TRIES):
416 delay=DELAY, max_tries=MAX_TRIES):
410 client = blockingCallFromThread(
417 client = blockingCallFromThread(
411 self.async_cc.get_client, profile, cluster_dir,
418 self.async_cc.get_client, profile, cluster_dir,
412 furl_or_file, ipython_dir,
419 furl_or_file, ipython_dir,
413 delay, max_tries
420 delay, max_tries
414 )
421 )
415 return client.adapt_to_blocking_client()
422 return client.adapt_to_blocking_client()
416
423
417
424
418 class ClusterStateError(Exception):
425 class ClusterStateError(Exception):
419 pass
426 pass
420
427
421
428
422 class AsyncCluster(object):
429 class AsyncCluster(object):
423 """An class that wraps the :command:`ipcluster` script."""
430 """An class that wraps the :command:`ipcluster` script."""
424
431
425 def __init__(self, profile='default', cluster_dir=None, ipython_dir=None,
432 def __init__(self, profile='default', cluster_dir=None, ipython_dir=None,
426 auto_create=False, auto_stop=True):
433 auto_create=False, auto_stop=True):
427 """Create a class to manage an IPython cluster.
434 """Create a class to manage an IPython cluster.
428
435
429 This class calls the :command:`ipcluster` command with the right
436 This class calls the :command:`ipcluster` command with the right
430 options to start an IPython cluster. Typically a cluster directory
437 options to start an IPython cluster. Typically a cluster directory
431 must be created (:command:`ipcluster create`) and configured before
438 must be created (:command:`ipcluster create`) and configured before
432 using this class. Configuration is done by editing the
439 using this class. Configuration is done by editing the
433 configuration files in the top level of the cluster directory.
440 configuration files in the top level of the cluster directory.
434
441
435 Parameters
442 Parameters
436 ----------
443 ----------
437 profile : str
444 profile : str
438 The name of a cluster directory profile (default="default"). The
445 The name of a cluster directory profile (default="default"). The
439 cluster directory "cluster_<profile>" will be searched for
446 cluster directory "cluster_<profile>" will be searched for
440 in ``os.getcwd()``, the ipython_dir and then in the directories
447 in ``os.getcwd()``, the ipython_dir and then in the directories
441 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
448 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
442 cluster_dir : str
449 cluster_dir : str
443 The full path to a cluster directory. This is useful if profiles
450 The full path to a cluster directory. This is useful if profiles
444 are not being used.
451 are not being used.
445 ipython_dir : str
452 ipython_dir : str
446 The location of the ipython_dir if different from the default.
453 The location of the ipython_dir if different from the default.
447 This is used if the cluster directory is being found by profile.
454 This is used if the cluster directory is being found by profile.
448 auto_create : bool
455 auto_create : bool
449 Automatically create the cluster directory it is dones't exist.
456 Automatically create the cluster directory it is dones't exist.
450 This will usually only make sense if using a local cluster
457 This will usually only make sense if using a local cluster
451 (default=False).
458 (default=False).
452 auto_stop : bool
459 auto_stop : bool
453 Automatically stop the cluster when this instance is garbage
460 Automatically stop the cluster when this instance is garbage
454 collected (default=True). This is useful if you want the cluster
461 collected (default=True). This is useful if you want the cluster
455 to live beyond your current process. There is also an instance
462 to live beyond your current process. There is also an instance
456 attribute ``auto_stop`` to change this behavior.
463 attribute ``auto_stop`` to change this behavior.
457 """
464 """
458 self._setup_cluster_dir(profile, cluster_dir, ipython_dir, auto_create)
465 self._setup_cluster_dir(profile, cluster_dir, ipython_dir, auto_create)
459 self.state = 'before'
466 self.state = 'before'
460 self.launcher = None
467 self.launcher = None
461 self.client_connector = None
468 self.client_connector = None
462 self.auto_stop = auto_stop
469 self.auto_stop = auto_stop
463
470
464 def __del__(self):
471 def __del__(self):
465 if self.auto_stop and self.state=='running':
472 if self.auto_stop and self.state=='running':
466 print "Auto stopping the cluster..."
473 print "Auto stopping the cluster..."
467 self.stop()
474 self.stop()
468
475
469 @property
476 @property
470 def location(self):
477 def location(self):
471 if hasattr(self, 'cluster_dir_obj'):
478 if hasattr(self, 'cluster_dir_obj'):
472 return self.cluster_dir_obj.location
479 return self.cluster_dir_obj.location
473 else:
480 else:
474 return ''
481 return ''
475
482
476 @property
483 @property
477 def running(self):
484 def running(self):
478 if self.state=='running':
485 if self.state=='running':
479 return True
486 return True
480 else:
487 else:
481 return False
488 return False
482
489
483 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir, auto_create):
490 def _setup_cluster_dir(self, profile, cluster_dir, ipython_dir, auto_create):
484 if ipython_dir is None:
491 if ipython_dir is None:
485 ipython_dir = get_ipython_dir()
492 ipython_dir = get_ipython_dir()
486 if cluster_dir is not None:
493 if cluster_dir is not None:
487 try:
494 try:
488 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
495 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
489 except ClusterDirError:
496 except ClusterDirError:
490 pass
497 pass
491 if profile is not None:
498 if profile is not None:
492 try:
499 try:
493 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
500 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
494 ipython_dir, profile)
501 ipython_dir, profile)
495 except ClusterDirError:
502 except ClusterDirError:
496 pass
503 pass
497 if auto_create or profile=='default':
504 if auto_create or profile=='default':
498 # This should call 'ipcluster create --profile default
505 # This should call 'ipcluster create --profile default
499 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
506 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
500 ipython_dir, profile)
507 ipython_dir, profile)
501 else:
508 else:
502 raise ClusterDirError('Cluster dir not found.')
509 raise ClusterDirError('Cluster dir not found.')
503
510
504 @make_deferred
511 @make_deferred
505 def start(self, n=2):
512 def start(self, n=2):
506 """Start the IPython cluster with n engines.
513 """Start the IPython cluster with n engines.
507
514
508 Parameters
515 Parameters
509 ----------
516 ----------
510 n : int
517 n : int
511 The number of engine to start.
518 The number of engine to start.
512 """
519 """
513 # We might want to add logic to test if the cluster has started
520 # We might want to add logic to test if the cluster has started
514 # by another process....
521 # by another process....
515 if not self.state=='running':
522 if not self.state=='running':
516 self.launcher = IPClusterLauncher(os.getcwd())
523 self.launcher = IPClusterLauncher(os.getcwd())
517 self.launcher.ipcluster_n = n
524 self.launcher.ipcluster_n = n
518 self.launcher.ipcluster_subcommand = 'start'
525 self.launcher.ipcluster_subcommand = 'start'
519 d = self.launcher.start()
526 d = self.launcher.start()
520 d.addCallback(self._handle_start)
527 d.addCallback(self._handle_start)
521 return d
528 return d
522 else:
529 else:
523 raise ClusterStateError('Cluster is already running')
530 raise ClusterStateError('Cluster is already running')
524
531
525 @make_deferred
532 @make_deferred
526 def stop(self):
533 def stop(self):
527 """Stop the IPython cluster if it is running."""
534 """Stop the IPython cluster if it is running."""
528 if self.state=='running':
535 if self.state=='running':
529 d1 = self.launcher.observe_stop()
536 d1 = self.launcher.observe_stop()
530 d1.addCallback(self._handle_stop)
537 d1.addCallback(self._handle_stop)
531 d2 = self.launcher.stop()
538 d2 = self.launcher.stop()
532 return gatherBoth([d1, d2], consumeErrors=True)
539 return gatherBoth([d1, d2], consumeErrors=True)
533 else:
540 else:
534 raise ClusterStateError("Cluster not running")
541 raise ClusterStateError("Cluster not running")
535
542
536 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
543 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
537 """Get the multiengine client for the running cluster.
544 """Get the multiengine client for the running cluster.
538
545
539 If this fails, it means that the cluster has not finished starting.
546 If this fails, it means that the cluster has not finished starting.
540 Usually waiting a few seconds are re-trying will solve this.
547 Usually waiting a few seconds are re-trying will solve this.
541 """
548 """
542 if self.client_connector is None:
549 if self.client_connector is None:
543 self.client_connector = AsyncClientConnector()
550 self.client_connector = AsyncClientConnector()
544 return self.client_connector.get_multiengine_client(
551 return self.client_connector.get_multiengine_client(
545 cluster_dir=self.cluster_dir_obj.location,
552 cluster_dir=self.cluster_dir_obj.location,
546 delay=delay, max_tries=max_tries
553 delay=delay, max_tries=max_tries
547 )
554 )
548
555
549 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
556 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
550 """Get the task client for the running cluster.
557 """Get the task client for the running cluster.
551
558
552 If this fails, it means that the cluster has not finished starting.
559 If this fails, it means that the cluster has not finished starting.
553 Usually waiting a few seconds are re-trying will solve this.
560 Usually waiting a few seconds are re-trying will solve this.
554 """
561 """
555 if self.client_connector is None:
562 if self.client_connector is None:
556 self.client_connector = AsyncClientConnector()
563 self.client_connector = AsyncClientConnector()
557 return self.client_connector.get_task_client(
564 return self.client_connector.get_task_client(
558 cluster_dir=self.cluster_dir_obj.location,
565 cluster_dir=self.cluster_dir_obj.location,
559 delay=delay, max_tries=max_tries
566 delay=delay, max_tries=max_tries
560 )
567 )
561
568
562 def get_ipengine_logs(self):
569 def get_ipengine_logs(self):
563 return self.get_logs_by_name('ipengine')
570 return self.get_logs_by_name('ipengine')
564
571
565 def get_ipcontroller_logs(self):
572 def get_ipcontroller_logs(self):
566 return self.get_logs_by_name('ipcontroller')
573 return self.get_logs_by_name('ipcontroller')
567
574
568 def get_ipcluster_logs(self):
575 def get_ipcluster_logs(self):
569 return self.get_logs_by_name('ipcluster')
576 return self.get_logs_by_name('ipcluster')
570
577
571 def get_logs_by_name(self, name='ipcluster'):
578 def get_logs_by_name(self, name='ipcluster'):
572 log_dir = self.cluster_dir_obj.log_dir
579 log_dir = self.cluster_dir_obj.log_dir
573 logs = {}
580 logs = {}
574 for log in os.listdir(log_dir):
581 for log in os.listdir(log_dir):
575 if log.startswith(name + '-') and log.endswith('.log'):
582 if log.startswith(name + '-') and log.endswith('.log'):
576 with open(os.path.join(log_dir, log), 'r') as f:
583 with open(os.path.join(log_dir, log), 'r') as f:
577 logs[log] = f.read()
584 logs[log] = f.read()
578 return logs
585 return logs
579
586
580 def get_logs(self):
587 def get_logs(self):
581 d = self.get_ipcluster_logs()
588 d = self.get_ipcluster_logs()
582 d.update(self.get_ipengine_logs())
589 d.update(self.get_ipengine_logs())
583 d.update(self.get_ipcontroller_logs())
590 d.update(self.get_ipcontroller_logs())
584 return d
591 return d
585
592
586 def _handle_start(self, r):
593 def _handle_start(self, r):
587 self.state = 'running'
594 self.state = 'running'
588
595
589 def _handle_stop(self, r):
596 def _handle_stop(self, r):
590 self.state = 'after'
597 self.state = 'after'
591
598
592
599
593 class Cluster(object):
600 class Cluster(object):
594
601
595
602
596 def __init__(self, profile='default', cluster_dir=None, ipython_dir=None,
603 def __init__(self, profile='default', cluster_dir=None, ipython_dir=None,
597 auto_create=False, auto_stop=True):
604 auto_create=False, auto_stop=True):
598 """Create a class to manage an IPython cluster.
605 """Create a class to manage an IPython cluster.
599
606
600 This class calls the :command:`ipcluster` command with the right
607 This class calls the :command:`ipcluster` command with the right
601 options to start an IPython cluster. Typically a cluster directory
608 options to start an IPython cluster. Typically a cluster directory
602 must be created (:command:`ipcluster create`) and configured before
609 must be created (:command:`ipcluster create`) and configured before
603 using this class. Configuration is done by editing the
610 using this class. Configuration is done by editing the
604 configuration files in the top level of the cluster directory.
611 configuration files in the top level of the cluster directory.
605
612
606 Parameters
613 Parameters
607 ----------
614 ----------
608 profile : str
615 profile : str
609 The name of a cluster directory profile (default="default"). The
616 The name of a cluster directory profile (default="default"). The
610 cluster directory "cluster_<profile>" will be searched for
617 cluster directory "cluster_<profile>" will be searched for
611 in ``os.getcwd()``, the ipython_dir and then in the directories
618 in ``os.getcwd()``, the ipython_dir and then in the directories
612 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
619 listed in the :env:`IPCLUSTER_DIR_PATH` environment variable.
613 cluster_dir : str
620 cluster_dir : str
614 The full path to a cluster directory. This is useful if profiles
621 The full path to a cluster directory. This is useful if profiles
615 are not being used.
622 are not being used.
616 ipython_dir : str
623 ipython_dir : str
617 The location of the ipython_dir if different from the default.
624 The location of the ipython_dir if different from the default.
618 This is used if the cluster directory is being found by profile.
625 This is used if the cluster directory is being found by profile.
619 auto_create : bool
626 auto_create : bool
620 Automatically create the cluster directory it is dones't exist.
627 Automatically create the cluster directory it is dones't exist.
621 This will usually only make sense if using a local cluster
628 This will usually only make sense if using a local cluster
622 (default=False).
629 (default=False).
623 auto_stop : bool
630 auto_stop : bool
624 Automatically stop the cluster when this instance is garbage
631 Automatically stop the cluster when this instance is garbage
625 collected (default=True). This is useful if you want the cluster
632 collected (default=True). This is useful if you want the cluster
626 to live beyond your current process. There is also an instance
633 to live beyond your current process. There is also an instance
627 attribute ``auto_stop`` to change this behavior.
634 attribute ``auto_stop`` to change this behavior.
628 """
635 """
629 self.async_cluster = AsyncCluster(
636 self.async_cluster = AsyncCluster(
630 profile, cluster_dir, ipython_dir, auto_create, auto_stop
637 profile, cluster_dir, ipython_dir, auto_create, auto_stop
631 )
638 )
632 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
639 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
633 self.client_connector = None
640 self.client_connector = None
634
641
635 def _set_auto_stop(self, value):
642 def _set_auto_stop(self, value):
636 self.async_cluster.auto_stop = value
643 self.async_cluster.auto_stop = value
637
644
638 def _get_auto_stop(self):
645 def _get_auto_stop(self):
639 return self.async_cluster.auto_stop
646 return self.async_cluster.auto_stop
640
647
641 auto_stop = property(_get_auto_stop, _set_auto_stop)
648 auto_stop = property(_get_auto_stop, _set_auto_stop)
642
649
643 @property
650 @property
644 def location(self):
651 def location(self):
645 return self.async_cluster.location
652 return self.async_cluster.location
646
653
647 @property
654 @property
648 def running(self):
655 def running(self):
649 return self.async_cluster.running
656 return self.async_cluster.running
650
657
651 def start(self, n=2):
658 def start(self, n=2):
652 """Start the IPython cluster with n engines.
659 """Start the IPython cluster with n engines.
653
660
654 Parameters
661 Parameters
655 ----------
662 ----------
656 n : int
663 n : int
657 The number of engine to start.
664 The number of engine to start.
658 """
665 """
659 return blockingCallFromThread(self.async_cluster.start, n)
666 return blockingCallFromThread(self.async_cluster.start, n)
660
667
661 def stop(self):
668 def stop(self):
662 """Stop the IPython cluster if it is running."""
669 """Stop the IPython cluster if it is running."""
663 return blockingCallFromThread(self.async_cluster.stop)
670 return blockingCallFromThread(self.async_cluster.stop)
664
671
665 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
672 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
666 """Get the multiengine client for the running cluster.
673 """Get the multiengine client for the running cluster.
667
674
668 This will try to attempt to the controller multiple times. If this
675 This will try to attempt to the controller multiple times. If this
669 fails altogether, try looking at the following:
676 fails altogether, try looking at the following:
670 * Make sure the controller is starting properly by looking at its
677 * Make sure the controller is starting properly by looking at its
671 log files.
678 log files.
672 * Make sure the controller is writing its FURL file in the location
679 * Make sure the controller is writing its FURL file in the location
673 expected by the client.
680 expected by the client.
674 * Make sure a firewall on the controller's host is not blocking the
681 * Make sure a firewall on the controller's host is not blocking the
675 client from connecting.
682 client from connecting.
676
683
677 Parameters
684 Parameters
678 ----------
685 ----------
679 delay : float
686 delay : float
680 The initial delay between re-connection attempts. Susequent delays
687 The initial delay between re-connection attempts. Susequent delays
681 get longer according to ``delay[i] = 1.5*delay[i-1]``.
688 get longer according to ``delay[i] = 1.5*delay[i-1]``.
682 max_tries : int
689 max_tries : int
683 The max number of re-connection attempts.
690 The max number of re-connection attempts.
684 """
691 """
685 if self.client_connector is None:
692 if self.client_connector is None:
686 self.client_connector = ClientConnector()
693 self.client_connector = ClientConnector()
687 return self.client_connector.get_multiengine_client(
694 return self.client_connector.get_multiengine_client(
688 cluster_dir=self.cluster_dir_obj.location,
695 cluster_dir=self.cluster_dir_obj.location,
689 delay=delay, max_tries=max_tries
696 delay=delay, max_tries=max_tries
690 )
697 )
691
698
692 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
699 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
693 """Get the task client for the running cluster.
700 """Get the task client for the running cluster.
694
701
695 This will try to attempt to the controller multiple times. If this
702 This will try to attempt to the controller multiple times. If this
696 fails altogether, try looking at the following:
703 fails altogether, try looking at the following:
697 * Make sure the controller is starting properly by looking at its
704 * Make sure the controller is starting properly by looking at its
698 log files.
705 log files.
699 * Make sure the controller is writing its FURL file in the location
706 * Make sure the controller is writing its FURL file in the location
700 expected by the client.
707 expected by the client.
701 * Make sure a firewall on the controller's host is not blocking the
708 * Make sure a firewall on the controller's host is not blocking the
702 client from connecting.
709 client from connecting.
703
710
704 Parameters
711 Parameters
705 ----------
712 ----------
706 delay : float
713 delay : float
707 The initial delay between re-connection attempts. Susequent delays
714 The initial delay between re-connection attempts. Susequent delays
708 get longer according to ``delay[i] = 1.5*delay[i-1]``.
715 get longer according to ``delay[i] = 1.5*delay[i-1]``.
709 max_tries : int
716 max_tries : int
710 The max number of re-connection attempts.
717 The max number of re-connection attempts.
711 """
718 """
712 if self.client_connector is None:
719 if self.client_connector is None:
713 self.client_connector = ClientConnector()
720 self.client_connector = ClientConnector()
714 return self.client_connector.get_task_client(
721 return self.client_connector.get_task_client(
715 cluster_dir=self.cluster_dir_obj.location,
722 cluster_dir=self.cluster_dir_obj.location,
716 delay=delay, max_tries=max_tries
723 delay=delay, max_tries=max_tries
717 )
724 )
718
725
719 def __repr__(self):
726 def __repr__(self):
720 s = "<Cluster(running=%r, location=%s)" % (self.running, self.location)
727 s = "<Cluster(running=%r, location=%s)" % (self.running, self.location)
721 return s
728 return s
722
729
723 def get_logs_by_name(self, name='ipcluter'):
730 def get_logs_by_name(self, name='ipcluter'):
724 """Get a dict of logs by process name (ipcluster, ipengine, etc.)"""
731 """Get a dict of logs by process name (ipcluster, ipengine, etc.)"""
725 return self.async_cluster.get_logs_by_name(name)
732 return self.async_cluster.get_logs_by_name(name)
726
733
727 def get_ipengine_logs(self):
734 def get_ipengine_logs(self):
728 """Get a dict of logs for all engines in this cluster."""
735 """Get a dict of logs for all engines in this cluster."""
729 return self.async_cluster.get_ipengine_logs()
736 return self.async_cluster.get_ipengine_logs()
730
737
731 def get_ipcontroller_logs(self):
738 def get_ipcontroller_logs(self):
732 """Get a dict of logs for the controller in this cluster."""
739 """Get a dict of logs for the controller in this cluster."""
733 return self.async_cluster.get_ipcontroller_logs()
740 return self.async_cluster.get_ipcontroller_logs()
734
741
735 def get_ipcluster_logs(self):
742 def get_ipcluster_logs(self):
736 """Get a dict of the ipcluster logs for this cluster."""
743 """Get a dict of the ipcluster logs for this cluster."""
737 return self.async_cluster.get_ipcluster_logs()
744 return self.async_cluster.get_ipcluster_logs()
738
745
739 def get_logs(self):
746 def get_logs(self):
740 """Get a dict of all logs for this cluster."""
747 """Get a dict of all logs for this cluster."""
741 return self.async_cluster.get_logs()
748 return self.async_cluster.get_logs()
742
749
743 def _print_logs(self, logs):
750 def _print_logs(self, logs):
744 for k, v in logs.iteritems():
751 for k, v in logs.iteritems():
745 print "==================================="
752 print "==================================="
746 print "Logfile: %s" % k
753 print "Logfile: %s" % k
747 print "==================================="
754 print "==================================="
748 print v
755 print v
749 print
756 print
750
757
751 def print_ipengine_logs(self):
758 def print_ipengine_logs(self):
752 """Print the ipengine logs for this cluster to stdout."""
759 """Print the ipengine logs for this cluster to stdout."""
753 self._print_logs(self.get_ipengine_logs())
760 self._print_logs(self.get_ipengine_logs())
754
761
755 def print_ipcontroller_logs(self):
762 def print_ipcontroller_logs(self):
756 """Print the ipcontroller logs for this cluster to stdout."""
763 """Print the ipcontroller logs for this cluster to stdout."""
757 self._print_logs(self.get_ipcontroller_logs())
764 self._print_logs(self.get_ipcontroller_logs())
758
765
759 def print_ipcluster_logs(self):
766 def print_ipcluster_logs(self):
760 """Print the ipcluster logs for this cluster to stdout."""
767 """Print the ipcluster logs for this cluster to stdout."""
761 self._print_logs(self.get_ipcluster_logs())
768 self._print_logs(self.get_ipcluster_logs())
762
769
763 def print_logs(self):
770 def print_logs(self):
764 """Print all the logs for this cluster to stdout."""
771 """Print all the logs for this cluster to stdout."""
765 self._print_logs(self.get_logs())
772 self._print_logs(self.get_logs())
766
773
@@ -1,457 +1,457 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 The ipcluster application.
4 The ipcluster application.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 import logging
18 import logging
19 import os
19 import os
20 import signal
20 import signal
21 import sys
21 import sys
22
22
23 if os.name=='posix':
23 if os.name=='posix':
24 from twisted.scripts._twistd_unix import daemonize
24 from twisted.scripts._twistd_unix import daemonize
25
25
26 from IPython.core import release
26 from IPython.core import release
27 from IPython.external import argparse
27 from IPython.external import argparse
28 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
28 from IPython.config.loader import ArgParseConfigLoader, NoConfigDefault
29 from IPython.utils.importstring import import_item
29 from IPython.utils.importstring import import_item
30
30
31 from IPython.kernel.clusterdir import (
31 from IPython.kernel.clusterdir import (
32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
32 ApplicationWithClusterDir, ClusterDirError, PIDFileError
33 )
33 )
34
34
35 from twisted.internet import reactor, defer
35 from twisted.internet import reactor, defer
36 from twisted.python import log, failure
36 from twisted.python import log, failure
37
37
38
38
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40 # The ipcluster application
40 # The ipcluster application
41 #-----------------------------------------------------------------------------
41 #-----------------------------------------------------------------------------
42
42
43
43
44 # Exit codes for ipcluster
44 # Exit codes for ipcluster
45
45
46 # This will be the exit code if the ipcluster appears to be running because
46 # This will be the exit code if the ipcluster appears to be running because
47 # a .pid file exists
47 # a .pid file exists
48 ALREADY_STARTED = 10
48 ALREADY_STARTED = 10
49
49
50 # This will be the exit code if ipcluster stop is run, but there is not .pid
50 # This will be the exit code if ipcluster stop is run, but there is not .pid
51 # file to be found.
51 # file to be found.
52 ALREADY_STOPPED = 11
52 ALREADY_STOPPED = 11
53
53
54
54
55 class IPClusterCLLoader(ArgParseConfigLoader):
55 class IPClusterCLLoader(ArgParseConfigLoader):
56
56
57 def _add_arguments(self):
57 def _add_arguments(self):
58 # This has all the common options that all subcommands use
58 # This has all the common options that all subcommands use
59 parent_parser1 = argparse.ArgumentParser(add_help=False)
59 parent_parser1 = argparse.ArgumentParser(add_help=False)
60 parent_parser1.add_argument('--ipython-dir',
60 parent_parser1.add_argument('--ipython-dir',
61 dest='Global.ipython_dir',type=unicode,
61 dest='Global.ipython_dir',type=unicode,
62 help='Set to override default location of Global.ipython_dir.',
62 help='Set to override default location of Global.ipython_dir.',
63 default=NoConfigDefault,
63 default=NoConfigDefault,
64 metavar='Global.ipython_dir')
64 metavar='Global.ipython_dir')
65 parent_parser1.add_argument('--log-level',
65 parent_parser1.add_argument('--log-level',
66 dest="Global.log_level",type=int,
66 dest="Global.log_level",type=int,
67 help='Set the log level (0,10,20,30,40,50). Default is 30.',
67 help='Set the log level (0,10,20,30,40,50). Default is 30.',
68 default=NoConfigDefault,
68 default=NoConfigDefault,
69 metavar='Global.log_level')
69 metavar='Global.log_level')
70
70
71 # This has all the common options that other subcommands use
71 # This has all the common options that other subcommands use
72 parent_parser2 = argparse.ArgumentParser(add_help=False)
72 parent_parser2 = argparse.ArgumentParser(add_help=False)
73 parent_parser2.add_argument('-p','--profile',
73 parent_parser2.add_argument('-p','--profile',
74 dest='Global.profile',type=unicode,
74 dest='Global.profile',type=unicode,
75 default=NoConfigDefault,
75 default=NoConfigDefault,
76 help='The string name of the profile to be used. This determines '
76 help='The string name of the profile to be used. This determines '
77 'the name of the cluster dir as: cluster_<profile>. The default profile '
77 'the name of the cluster dir as: cluster_<profile>. The default profile '
78 'is named "default". The cluster directory is resolve this way '
78 'is named "default". The cluster directory is resolve this way '
79 'if the --cluster-dir option is not used.',
79 'if the --cluster-dir option is not used.',
80 default=NoConfigDefault,
80 default=NoConfigDefault,
81 metavar='Global.profile')
81 metavar='Global.profile')
82 parent_parser2.add_argument('--cluster-dir',
82 parent_parser2.add_argument('--cluster-dir',
83 dest='Global.cluster_dir',type=unicode,
83 dest='Global.cluster_dir',type=unicode,
84 default=NoConfigDefault,
84 default=NoConfigDefault,
85 help='Set the cluster dir. This overrides the logic used by the '
85 help='Set the cluster dir. This overrides the logic used by the '
86 '--profile option.',
86 '--profile option.',
87 default=NoConfigDefault,
87 default=NoConfigDefault,
88 metavar='Global.cluster_dir'),
88 metavar='Global.cluster_dir'),
89 parent_parser2.add_argument('--work-dir',
89 parent_parser2.add_argument('--work-dir',
90 dest='Global.work_dir',type=unicode,
90 dest='Global.work_dir',type=unicode,
91 help='Set the working dir for the process.',
91 help='Set the working dir for the process.',
92 default=NoConfigDefault,
92 default=NoConfigDefault,
93 metavar='Global.work_dir')
93 metavar='Global.work_dir')
94 parent_parser2.add_argument('--log-to-file',
94 parent_parser2.add_argument('--log-to-file',
95 action='store_true', dest='Global.log_to_file',
95 action='store_true', dest='Global.log_to_file',
96 default=NoConfigDefault,
96 default=NoConfigDefault,
97 help='Log to a file in the log directory (default is stdout)'
97 help='Log to a file in the log directory (default is stdout)'
98 )
98 )
99
99
100 subparsers = self.parser.add_subparsers(
100 subparsers = self.parser.add_subparsers(
101 dest='Global.subcommand',
101 dest='Global.subcommand',
102 title='ipcluster subcommands',
102 title='ipcluster subcommands',
103 description='ipcluster has a variety of subcommands. '
103 description='ipcluster has a variety of subcommands. '
104 'The general way of running ipcluster is "ipcluster <cmd> '
104 'The general way of running ipcluster is "ipcluster <cmd> '
105 ' [options]""',
105 ' [options]""',
106 help='For more help, type "ipcluster <cmd> -h"')
106 help='For more help, type "ipcluster <cmd> -h"')
107
107
108 parser_list = subparsers.add_parser(
108 parser_list = subparsers.add_parser(
109 'list',
109 'list',
110 help='List all clusters in cwd and ipython_dir.',
110 help='List all clusters in cwd and ipython_dir.',
111 parents=[parent_parser1]
111 parents=[parent_parser1]
112 )
112 )
113
113
114 parser_create = subparsers.add_parser(
114 parser_create = subparsers.add_parser(
115 'create',
115 'create',
116 help='Create a new cluster directory.',
116 help='Create a new cluster directory.',
117 parents=[parent_parser1, parent_parser2]
117 parents=[parent_parser1, parent_parser2]
118 )
118 )
119 parser_create.add_argument(
119 parser_create.add_argument(
120 '--reset-config',
120 '--reset-config',
121 dest='Global.reset_config', action='store_true',
121 dest='Global.reset_config', action='store_true',
122 default=NoConfigDefault,
122 default=NoConfigDefault,
123 help='Recopy the default config files to the cluster directory. '
123 help='Recopy the default config files to the cluster directory. '
124 'You will loose any modifications you have made to these files.'
124 'You will loose any modifications you have made to these files.'
125 )
125 )
126
126
127 parser_start = subparsers.add_parser(
127 parser_start = subparsers.add_parser(
128 'start',
128 'start',
129 help='Start a cluster.',
129 help='Start a cluster.',
130 parents=[parent_parser1, parent_parser2]
130 parents=[parent_parser1, parent_parser2]
131 )
131 )
132 parser_start.add_argument(
132 parser_start.add_argument(
133 '-n', '--number',
133 '-n', '--number',
134 type=int, dest='Global.n',
134 type=int, dest='Global.n',
135 default=NoConfigDefault,
135 default=NoConfigDefault,
136 help='The number of engines to start.',
136 help='The number of engines to start.',
137 metavar='Global.n'
137 metavar='Global.n'
138 )
138 )
139 parser_start.add_argument('--clean-logs',
139 parser_start.add_argument('--clean-logs',
140 dest='Global.clean_logs', action='store_true',
140 dest='Global.clean_logs', action='store_true',
141 help='Delete old log flies before starting.',
141 help='Delete old log flies before starting.',
142 default=NoConfigDefault
142 default=NoConfigDefault
143 )
143 )
144 parser_start.add_argument('--no-clean-logs',
144 parser_start.add_argument('--no-clean-logs',
145 dest='Global.clean_logs', action='store_false',
145 dest='Global.clean_logs', action='store_false',
146 help="Don't delete old log flies before starting.",
146 help="Don't delete old log flies before starting.",
147 default=NoConfigDefault
147 default=NoConfigDefault
148 )
148 )
149 parser_start.add_argument('--daemon',
149 parser_start.add_argument('--daemon',
150 dest='Global.daemonize', action='store_true',
150 dest='Global.daemonize', action='store_true',
151 help='Daemonize the ipcluster program. This implies --log-to-file',
151 help='Daemonize the ipcluster program. This implies --log-to-file',
152 default=NoConfigDefault
152 default=NoConfigDefault
153 )
153 )
154 parser_start.add_argument('--no-daemon',
154 parser_start.add_argument('--no-daemon',
155 dest='Global.daemonize', action='store_false',
155 dest='Global.daemonize', action='store_false',
156 help="Dont't daemonize the ipcluster program.",
156 help="Dont't daemonize the ipcluster program.",
157 default=NoConfigDefault
157 default=NoConfigDefault
158 )
158 )
159
159
160 parser_start = subparsers.add_parser(
160 parser_start = subparsers.add_parser(
161 'stop',
161 'stop',
162 help='Stop a cluster.',
162 help='Stop a cluster.',
163 parents=[parent_parser1, parent_parser2]
163 parents=[parent_parser1, parent_parser2]
164 )
164 )
165 parser_start.add_argument('--signal',
165 parser_start.add_argument('--signal',
166 dest='Global.signal', type=int,
166 dest='Global.signal', type=int,
167 help="The signal number to use in stopping the cluster (default=2).",
167 help="The signal number to use in stopping the cluster (default=2).",
168 metavar="Global.signal",
168 metavar="Global.signal",
169 default=NoConfigDefault
169 default=NoConfigDefault
170 )
170 )
171
171
172
172
173 default_config_file_name = u'ipcluster_config.py'
173 default_config_file_name = u'ipcluster_config.py'
174
174
175
175
176 class IPClusterApp(ApplicationWithClusterDir):
176 class IPClusterApp(ApplicationWithClusterDir):
177
177
178 name = u'ipcluster'
178 name = u'ipcluster'
179 description = 'Start an IPython cluster (controller and engines).'
179 description = 'Start an IPython cluster (controller and engines).'
180 config_file_name = default_config_file_name
180 config_file_name = default_config_file_name
181 default_log_level = logging.INFO
181 default_log_level = logging.INFO
182 auto_create_cluster_dir = False
182 auto_create_cluster_dir = False
183
183
184 def create_default_config(self):
184 def create_default_config(self):
185 super(IPClusterApp, self).create_default_config()
185 super(IPClusterApp, self).create_default_config()
186 self.default_config.Global.controller_launcher = \
186 self.default_config.Global.controller_launcher = \
187 'IPython.kernel.launcher.LocalControllerLauncher'
187 'IPython.kernel.launcher.LocalControllerLauncher'
188 self.default_config.Global.engine_launcher = \
188 self.default_config.Global.engine_launcher = \
189 'IPython.kernel.launcher.LocalEngineSetLauncher'
189 'IPython.kernel.launcher.LocalEngineSetLauncher'
190 self.default_config.Global.n = 2
190 self.default_config.Global.n = 2
191 self.default_config.Global.reset_config = False
191 self.default_config.Global.reset_config = False
192 self.default_config.Global.clean_logs = True
192 self.default_config.Global.clean_logs = True
193 self.default_config.Global.signal = 2
193 self.default_config.Global.signal = 2
194 self.default_config.Global.daemonize = False
194 self.default_config.Global.daemonize = False
195
195
196 def create_command_line_config(self):
196 def create_command_line_config(self):
197 """Create and return a command line config loader."""
197 """Create and return a command line config loader."""
198 return IPClusterCLLoader(
198 return IPClusterCLLoader(
199 description=self.description,
199 description=self.description,
200 version=release.version
200 version=release.version
201 )
201 )
202
202
203 def find_resources(self):
203 def find_resources(self):
204 subcommand = self.command_line_config.Global.subcommand
204 subcommand = self.command_line_config.Global.subcommand
205 if subcommand=='list':
205 if subcommand=='list':
206 self.list_cluster_dirs()
206 self.list_cluster_dirs()
207 # Exit immediately because there is nothing left to do.
207 # Exit immediately because there is nothing left to do.
208 self.exit()
208 self.exit()
209 elif subcommand=='create':
209 elif subcommand=='create':
210 self.auto_create_cluster_dir = True
210 self.auto_create_cluster_dir = True
211 super(IPClusterApp, self).find_resources()
211 super(IPClusterApp, self).find_resources()
212 elif subcommand=='start' or subcommand=='stop':
212 elif subcommand=='start' or subcommand=='stop':
213 self.auto_create_cluster_dir = False
213 self.auto_create_cluster_dir = False
214 try:
214 try:
215 super(IPClusterApp, self).find_resources()
215 super(IPClusterApp, self).find_resources()
216 except ClusterDirError:
216 except ClusterDirError:
217 raise ClusterDirError(
217 raise ClusterDirError(
218 "Could not find a cluster directory. A cluster dir must "
218 "Could not find a cluster directory. A cluster dir must "
219 "be created before running 'ipcluster start'. Do "
219 "be created before running 'ipcluster start'. Do "
220 "'ipcluster create -h' or 'ipcluster list -h' for more "
220 "'ipcluster create -h' or 'ipcluster list -h' for more "
221 "information about creating and listing cluster dirs."
221 "information about creating and listing cluster dirs."
222 )
222 )
223
223
224 def list_cluster_dirs(self):
224 def list_cluster_dirs(self):
225 # Find the search paths
225 # Find the search paths
226 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
226 cluster_dir_paths = os.environ.get('IPCLUSTER_DIR_PATH','')
227 if cluster_dir_paths:
227 if cluster_dir_paths:
228 cluster_dir_paths = cluster_dir_paths.split(':')
228 cluster_dir_paths = cluster_dir_paths.split(':')
229 else:
229 else:
230 cluster_dir_paths = []
230 cluster_dir_paths = []
231 try:
231 try:
232 ipython_dir = self.command_line_config.Global.ipython_dir
232 ipython_dir = self.command_line_config.Global.ipython_dir
233 except AttributeError:
233 except AttributeError:
234 ipython_dir = self.default_config.Global.ipython_dir
234 ipython_dir = self.default_config.Global.ipython_dir
235 paths = [os.getcwd(), ipython_dir] + \
235 paths = [os.getcwd(), ipython_dir] + \
236 cluster_dir_paths
236 cluster_dir_paths
237 paths = list(set(paths))
237 paths = list(set(paths))
238
238
239 self.log.info('Searching for cluster dirs in paths: %r' % paths)
239 self.log.info('Searching for cluster dirs in paths: %r' % paths)
240 for path in paths:
240 for path in paths:
241 files = os.listdir(path)
241 files = os.listdir(path)
242 for f in files:
242 for f in files:
243 full_path = os.path.join(path, f)
243 full_path = os.path.join(path, f)
244 if os.path.isdir(full_path) and f.startswith('cluster_'):
244 if os.path.isdir(full_path) and f.startswith('cluster_'):
245 profile = full_path.split('_')[-1]
245 profile = full_path.split('_')[-1]
246 start_cmd = '"ipcluster start -n 4 -p %s"' % profile
246 start_cmd = 'ipcluster start -p %s -n 4' % profile
247 print start_cmd + " ==> " + full_path
247 print start_cmd + " ==> " + full_path
248
248
249 def pre_construct(self):
249 def pre_construct(self):
250 # IPClusterApp.pre_construct() is where we cd to the working directory.
250 # IPClusterApp.pre_construct() is where we cd to the working directory.
251 super(IPClusterApp, self).pre_construct()
251 super(IPClusterApp, self).pre_construct()
252 config = self.master_config
252 config = self.master_config
253 try:
253 try:
254 daemon = config.Global.daemonize
254 daemon = config.Global.daemonize
255 if daemon:
255 if daemon:
256 config.Global.log_to_file = True
256 config.Global.log_to_file = True
257 except AttributeError:
257 except AttributeError:
258 pass
258 pass
259
259
260 def construct(self):
260 def construct(self):
261 config = self.master_config
261 config = self.master_config
262 if config.Global.subcommand=='list':
262 if config.Global.subcommand=='list':
263 pass
263 pass
264 elif config.Global.subcommand=='create':
264 elif config.Global.subcommand=='create':
265 self.log.info('Copying default config files to cluster directory '
265 self.log.info('Copying default config files to cluster directory '
266 '[overwrite=%r]' % (config.Global.reset_config,))
266 '[overwrite=%r]' % (config.Global.reset_config,))
267 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
267 self.cluster_dir_obj.copy_all_config_files(overwrite=config.Global.reset_config)
268 elif config.Global.subcommand=='start':
268 elif config.Global.subcommand=='start':
269 self.start_logging()
269 self.start_logging()
270 reactor.callWhenRunning(self.start_launchers)
270 reactor.callWhenRunning(self.start_launchers)
271
271
272 def start_launchers(self):
272 def start_launchers(self):
273 config = self.master_config
273 config = self.master_config
274
274
275 # Create the launchers. In both bases, we set the work_dir of
275 # Create the launchers. In both bases, we set the work_dir of
276 # the launcher to the cluster_dir. This is where the launcher's
276 # the launcher to the cluster_dir. This is where the launcher's
277 # subprocesses will be launched. It is not where the controller
277 # subprocesses will be launched. It is not where the controller
278 # and engine will be launched.
278 # and engine will be launched.
279 el_class = import_item(config.Global.engine_launcher)
279 el_class = import_item(config.Global.engine_launcher)
280 self.engine_launcher = el_class(
280 self.engine_launcher = el_class(
281 work_dir=self.cluster_dir, config=config
281 work_dir=self.cluster_dir, config=config
282 )
282 )
283 cl_class = import_item(config.Global.controller_launcher)
283 cl_class = import_item(config.Global.controller_launcher)
284 self.controller_launcher = cl_class(
284 self.controller_launcher = cl_class(
285 work_dir=self.cluster_dir, config=config
285 work_dir=self.cluster_dir, config=config
286 )
286 )
287
287
288 # Setup signals
288 # Setup signals
289 signal.signal(signal.SIGINT, self.sigint_handler)
289 signal.signal(signal.SIGINT, self.sigint_handler)
290
290
291 # Setup the observing of stopping. If the controller dies, shut
291 # Setup the observing of stopping. If the controller dies, shut
292 # everything down as that will be completely fatal for the engines.
292 # everything down as that will be completely fatal for the engines.
293 d1 = self.controller_launcher.observe_stop()
293 d1 = self.controller_launcher.observe_stop()
294 d1.addCallback(self.stop_launchers)
294 d1.addCallback(self.stop_launchers)
295 # But, we don't monitor the stopping of engines. An engine dying
295 # But, we don't monitor the stopping of engines. An engine dying
296 # is just fine and in principle a user could start a new engine.
296 # is just fine and in principle a user could start a new engine.
297 # Also, if we did monitor engine stopping, it is difficult to
297 # Also, if we did monitor engine stopping, it is difficult to
298 # know what to do when only some engines die. Currently, the
298 # know what to do when only some engines die. Currently, the
299 # observing of engine stopping is inconsistent. Some launchers
299 # observing of engine stopping is inconsistent. Some launchers
300 # might trigger on a single engine stopping, other wait until
300 # might trigger on a single engine stopping, other wait until
301 # all stop. TODO: think more about how to handle this.
301 # all stop. TODO: think more about how to handle this.
302
302
303 # Start the controller and engines
303 # Start the controller and engines
304 self._stopping = False # Make sure stop_launchers is not called 2x.
304 self._stopping = False # Make sure stop_launchers is not called 2x.
305 d = self.start_controller()
305 d = self.start_controller()
306 d.addCallback(self.start_engines)
306 d.addCallback(self.start_engines)
307 d.addCallback(self.startup_message)
307 d.addCallback(self.startup_message)
308 # If the controller or engines fail to start, stop everything
308 # If the controller or engines fail to start, stop everything
309 d.addErrback(self.stop_launchers)
309 d.addErrback(self.stop_launchers)
310 return d
310 return d
311
311
312 def startup_message(self, r=None):
312 def startup_message(self, r=None):
313 log.msg("IPython cluster: started")
313 log.msg("IPython cluster: started")
314 return r
314 return r
315
315
316 def start_controller(self, r=None):
316 def start_controller(self, r=None):
317 # log.msg("In start_controller")
317 # log.msg("In start_controller")
318 config = self.master_config
318 config = self.master_config
319 d = self.controller_launcher.start(
319 d = self.controller_launcher.start(
320 cluster_dir=config.Global.cluster_dir
320 cluster_dir=config.Global.cluster_dir
321 )
321 )
322 return d
322 return d
323
323
324 def start_engines(self, r=None):
324 def start_engines(self, r=None):
325 # log.msg("In start_engines")
325 # log.msg("In start_engines")
326 config = self.master_config
326 config = self.master_config
327 d = self.engine_launcher.start(
327 d = self.engine_launcher.start(
328 config.Global.n,
328 config.Global.n,
329 cluster_dir=config.Global.cluster_dir
329 cluster_dir=config.Global.cluster_dir
330 )
330 )
331 return d
331 return d
332
332
333 def stop_controller(self, r=None):
333 def stop_controller(self, r=None):
334 # log.msg("In stop_controller")
334 # log.msg("In stop_controller")
335 if self.controller_launcher.running:
335 if self.controller_launcher.running:
336 d = self.controller_launcher.stop()
336 d = self.controller_launcher.stop()
337 d.addErrback(self.log_err)
337 d.addErrback(self.log_err)
338 return d
338 return d
339 else:
339 else:
340 return defer.succeed(None)
340 return defer.succeed(None)
341
341
342 def stop_engines(self, r=None):
342 def stop_engines(self, r=None):
343 # log.msg("In stop_engines")
343 # log.msg("In stop_engines")
344 if self.engine_launcher.running:
344 if self.engine_launcher.running:
345 d = self.engine_launcher.stop()
345 d = self.engine_launcher.stop()
346 d.addErrback(self.log_err)
346 d.addErrback(self.log_err)
347 return d
347 return d
348 else:
348 else:
349 return defer.succeed(None)
349 return defer.succeed(None)
350
350
351 def log_err(self, f):
351 def log_err(self, f):
352 log.msg(f.getTraceback())
352 log.msg(f.getTraceback())
353 return None
353 return None
354
354
355 def stop_launchers(self, r=None):
355 def stop_launchers(self, r=None):
356 if not self._stopping:
356 if not self._stopping:
357 self._stopping = True
357 self._stopping = True
358 if isinstance(r, failure.Failure):
358 if isinstance(r, failure.Failure):
359 log.msg('Unexpected error in ipcluster:')
359 log.msg('Unexpected error in ipcluster:')
360 log.msg(r.getTraceback())
360 log.msg(r.getTraceback())
361 log.msg("IPython cluster: stopping")
361 log.msg("IPython cluster: stopping")
362 d= self.stop_engines()
362 d= self.stop_engines()
363 d2 = self.stop_controller()
363 d2 = self.stop_controller()
364 # Wait a few seconds to let things shut down.
364 # Wait a few seconds to let things shut down.
365 reactor.callLater(3.0, reactor.stop)
365 reactor.callLater(4.0, reactor.stop)
366
366
367 def sigint_handler(self, signum, frame):
367 def sigint_handler(self, signum, frame):
368 self.stop_launchers()
368 self.stop_launchers()
369
369
370 def start_logging(self):
370 def start_logging(self):
371 # Remove old log files of the controller and engine
371 # Remove old log files of the controller and engine
372 if self.master_config.Global.clean_logs:
372 if self.master_config.Global.clean_logs:
373 log_dir = self.master_config.Global.log_dir
373 log_dir = self.master_config.Global.log_dir
374 for f in os.listdir(log_dir):
374 for f in os.listdir(log_dir):
375 if f.startswith('ipengine' + '-'):
375 if f.startswith('ipengine' + '-'):
376 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
376 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
377 os.remove(os.path.join(log_dir, f))
377 os.remove(os.path.join(log_dir, f))
378 if f.startswith('ipcontroller' + '-'):
378 if f.startswith('ipcontroller' + '-'):
379 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
379 if f.endswith('.log') or f.endswith('.out') or f.endswith('.err'):
380 os.remove(os.path.join(log_dir, f))
380 os.remove(os.path.join(log_dir, f))
381 # This will remote old log files for ipcluster itself
381 # This will remote old log files for ipcluster itself
382 super(IPClusterApp, self).start_logging()
382 super(IPClusterApp, self).start_logging()
383
383
384 def start_app(self):
384 def start_app(self):
385 """Start the application, depending on what subcommand is used."""
385 """Start the application, depending on what subcommand is used."""
386 subcmd = self.master_config.Global.subcommand
386 subcmd = self.master_config.Global.subcommand
387 if subcmd=='create' or subcmd=='list':
387 if subcmd=='create' or subcmd=='list':
388 return
388 return
389 elif subcmd=='start':
389 elif subcmd=='start':
390 self.start_app_start()
390 self.start_app_start()
391 elif subcmd=='stop':
391 elif subcmd=='stop':
392 self.start_app_stop()
392 self.start_app_stop()
393
393
394 def start_app_start(self):
394 def start_app_start(self):
395 """Start the app for the start subcommand."""
395 """Start the app for the start subcommand."""
396 config = self.master_config
396 config = self.master_config
397 # First see if the cluster is already running
397 # First see if the cluster is already running
398 try:
398 try:
399 pid = self.get_pid_from_file()
399 pid = self.get_pid_from_file()
400 except PIDFileError:
400 except PIDFileError:
401 pass
401 pass
402 else:
402 else:
403 self.log.critical(
403 self.log.critical(
404 'Cluster is already running with [pid=%s]. '
404 'Cluster is already running with [pid=%s]. '
405 'use "ipcluster stop" to stop the cluster.' % pid
405 'use "ipcluster stop" to stop the cluster.' % pid
406 )
406 )
407 # Here I exit with a unusual exit status that other processes
407 # Here I exit with a unusual exit status that other processes
408 # can watch for to learn how I existed.
408 # can watch for to learn how I existed.
409 self.exit(ALREADY_STARTED)
409 self.exit(ALREADY_STARTED)
410
410
411 # Now log and daemonize
411 # Now log and daemonize
412 self.log.info(
412 self.log.info(
413 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
413 'Starting ipcluster with [daemon=%r]' % config.Global.daemonize
414 )
414 )
415 if config.Global.daemonize:
415 if config.Global.daemonize:
416 if os.name=='posix':
416 if os.name=='posix':
417 daemonize()
417 daemonize()
418
418
419 # Now write the new pid file AFTER our new forked pid is active.
419 # Now write the new pid file AFTER our new forked pid is active.
420 self.write_pid_file()
420 self.write_pid_file()
421 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
421 reactor.addSystemEventTrigger('during','shutdown', self.remove_pid_file)
422 reactor.run()
422 reactor.run()
423
423
424 def start_app_stop(self):
424 def start_app_stop(self):
425 """Start the app for the stop subcommand."""
425 """Start the app for the stop subcommand."""
426 config = self.master_config
426 config = self.master_config
427 try:
427 try:
428 pid = self.get_pid_from_file()
428 pid = self.get_pid_from_file()
429 except PIDFileError:
429 except PIDFileError:
430 self.log.critical(
430 self.log.critical(
431 'Problem reading pid file, cluster is probably not running.'
431 'Problem reading pid file, cluster is probably not running.'
432 )
432 )
433 # Here I exit with a unusual exit status that other processes
433 # Here I exit with a unusual exit status that other processes
434 # can watch for to learn how I existed.
434 # can watch for to learn how I existed.
435 self.exit(ALREADY_STOPPED)
435 self.exit(ALREADY_STOPPED)
436 else:
436 else:
437 if os.name=='posix':
437 if os.name=='posix':
438 sig = config.Global.signal
438 sig = config.Global.signal
439 self.log.info(
439 self.log.info(
440 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
440 "Stopping cluster [pid=%r] with [signal=%r]" % (pid, sig)
441 )
441 )
442 os.kill(pid, sig)
442 os.kill(pid, sig)
443 elif os.name=='nt':
443 elif os.name=='nt':
444 # As of right now, we don't support daemonize on Windows, so
444 # As of right now, we don't support daemonize on Windows, so
445 # stop will not do anything. Minimally, it should clean up the
445 # stop will not do anything. Minimally, it should clean up the
446 # old .pid files.
446 # old .pid files.
447 self.remove_pid_file()
447 self.remove_pid_file()
448
448
449 def launch_new_instance():
449 def launch_new_instance():
450 """Create and run the IPython cluster."""
450 """Create and run the IPython cluster."""
451 app = IPClusterApp()
451 app = IPClusterApp()
452 app.start()
452 app.start()
453
453
454
454
455 if __name__ == '__main__':
455 if __name__ == '__main__':
456 launch_new_instance()
456 launch_new_instance()
457
457
@@ -1,753 +1,752 b''
1 # encoding: utf-8
1 # encoding: utf-8
2 # -*- test-case-name: IPython.kernel.test.test_multiengine -*-
2 # -*- test-case-name: IPython.kernel.test.test_multiengine -*-
3
3
4 """Adapt the IPython ControllerServer to IMultiEngine.
4 """Adapt the IPython ControllerServer to IMultiEngine.
5
5
6 This module provides classes that adapt a ControllerService to the
6 This module provides classes that adapt a ControllerService to the
7 IMultiEngine interface. This interface is a basic interactive interface
7 IMultiEngine interface. This interface is a basic interactive interface
8 for working with a set of engines where it is desired to have explicit
8 for working with a set of engines where it is desired to have explicit
9 access to each registered engine.
9 access to each registered engine.
10
10
11 The classes here are exposed to the network in files like:
11 The classes here are exposed to the network in files like:
12
12
13 * multienginevanilla.py
13 * multienginevanilla.py
14 * multienginepb.py
14 * multienginepb.py
15 """
15 """
16
16
17 __docformat__ = "restructuredtext en"
17 __docformat__ = "restructuredtext en"
18
18
19 #-------------------------------------------------------------------------------
19 #-------------------------------------------------------------------------------
20 # Copyright (C) 2008 The IPython Development Team
20 # Copyright (C) 2008 The IPython Development Team
21 #
21 #
22 # Distributed under the terms of the BSD License. The full license is in
22 # Distributed under the terms of the BSD License. The full license is in
23 # the file COPYING, distributed as part of this software.
23 # the file COPYING, distributed as part of this software.
24 #-------------------------------------------------------------------------------
24 #-------------------------------------------------------------------------------
25
25
26 #-------------------------------------------------------------------------------
26 #-------------------------------------------------------------------------------
27 # Imports
27 # Imports
28 #-------------------------------------------------------------------------------
28 #-------------------------------------------------------------------------------
29
29
30 from new import instancemethod
30 from new import instancemethod
31 from types import FunctionType
31 from types import FunctionType
32
32
33 from twisted.application import service
33 from twisted.application import service
34 from twisted.internet import defer, reactor
34 from twisted.internet import defer, reactor
35 from twisted.python import log, components, failure
35 from twisted.python import log, components, failure
36 from zope.interface import Interface, implements, Attribute
36 from zope.interface import Interface, implements, Attribute
37
37
38 from IPython.utils import growl
38 from IPython.utils import growl
39 from IPython.kernel.util import printer
39 from IPython.kernel.util import printer
40 from IPython.kernel.twistedutil import gatherBoth
40 from IPython.kernel.twistedutil import gatherBoth
41 from IPython.kernel import map as Map
41 from IPython.kernel import map as Map
42 from IPython.kernel import error
42 from IPython.kernel import error
43 from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase
43 from IPython.kernel.pendingdeferred import PendingDeferredManager, two_phase
44 from IPython.kernel.controllerservice import \
44 from IPython.kernel.controllerservice import \
45 ControllerAdapterBase, \
45 ControllerAdapterBase, \
46 ControllerService, \
46 ControllerService, \
47 IControllerBase
47 IControllerBase
48
48
49
49
50 #-------------------------------------------------------------------------------
50 #-------------------------------------------------------------------------------
51 # Interfaces for the MultiEngine representation of a controller
51 # Interfaces for the MultiEngine representation of a controller
52 #-------------------------------------------------------------------------------
52 #-------------------------------------------------------------------------------
53
53
54 class IEngineMultiplexer(Interface):
54 class IEngineMultiplexer(Interface):
55 """Interface to multiple engines implementing IEngineCore/Serialized/Queued.
55 """Interface to multiple engines implementing IEngineCore/Serialized/Queued.
56
56
57 This class simply acts as a multiplexer of methods that are in the
57 This class simply acts as a multiplexer of methods that are in the
58 various IEngines* interfaces. Thus the methods here are jut like those
58 various IEngines* interfaces. Thus the methods here are jut like those
59 in the IEngine* interfaces, but with an extra first argument, targets.
59 in the IEngine* interfaces, but with an extra first argument, targets.
60 The targets argument can have the following forms:
60 The targets argument can have the following forms:
61
61
62 * targets = 10 # Engines are indexed by ints
62 * targets = 10 # Engines are indexed by ints
63 * targets = [0,1,2,3] # A list of ints
63 * targets = [0,1,2,3] # A list of ints
64 * targets = 'all' # A string to indicate all targets
64 * targets = 'all' # A string to indicate all targets
65
65
66 If targets is bad in any way, an InvalidEngineID will be raised. This
66 If targets is bad in any way, an InvalidEngineID will be raised. This
67 includes engines not being registered.
67 includes engines not being registered.
68
68
69 All IEngineMultiplexer multiplexer methods must return a Deferred to a list
69 All IEngineMultiplexer multiplexer methods must return a Deferred to a list
70 with length equal to the number of targets. The elements of the list will
70 with length equal to the number of targets. The elements of the list will
71 correspond to the return of the corresponding IEngine method.
71 correspond to the return of the corresponding IEngine method.
72
72
73 Failures are aggressive, meaning that if an action fails for any target,
73 Failures are aggressive, meaning that if an action fails for any target,
74 the overall action will fail immediately with that Failure.
74 the overall action will fail immediately with that Failure.
75
75
76 :Parameters:
76 :Parameters:
77 targets : int, list of ints, or 'all'
77 targets : int, list of ints, or 'all'
78 Engine ids the action will apply to.
78 Engine ids the action will apply to.
79
79
80 :Returns: Deferred to a list of results for each engine.
80 :Returns: Deferred to a list of results for each engine.
81
81
82 :Exception:
82 :Exception:
83 InvalidEngineID
83 InvalidEngineID
84 If the targets argument is bad or engines aren't registered.
84 If the targets argument is bad or engines aren't registered.
85 NoEnginesRegistered
85 NoEnginesRegistered
86 If there are no engines registered and targets='all'
86 If there are no engines registered and targets='all'
87 """
87 """
88
88
89 #---------------------------------------------------------------------------
89 #---------------------------------------------------------------------------
90 # Mutiplexed methods
90 # Mutiplexed methods
91 #---------------------------------------------------------------------------
91 #---------------------------------------------------------------------------
92
92
93 def execute(lines, targets='all'):
93 def execute(lines, targets='all'):
94 """Execute lines of Python code on targets.
94 """Execute lines of Python code on targets.
95
95
96 See the class docstring for information about targets and possible
96 See the class docstring for information about targets and possible
97 exceptions this method can raise.
97 exceptions this method can raise.
98
98
99 :Parameters:
99 :Parameters:
100 lines : str
100 lines : str
101 String of python code to be executed on targets.
101 String of python code to be executed on targets.
102 """
102 """
103
103
104 def push(namespace, targets='all'):
104 def push(namespace, targets='all'):
105 """Push dict namespace into the user's namespace on targets.
105 """Push dict namespace into the user's namespace on targets.
106
106
107 See the class docstring for information about targets and possible
107 See the class docstring for information about targets and possible
108 exceptions this method can raise.
108 exceptions this method can raise.
109
109
110 :Parameters:
110 :Parameters:
111 namspace : dict
111 namspace : dict
112 Dict of key value pairs to be put into the users namspace.
112 Dict of key value pairs to be put into the users namspace.
113 """
113 """
114
114
115 def pull(keys, targets='all'):
115 def pull(keys, targets='all'):
116 """Pull values out of the user's namespace on targets by keys.
116 """Pull values out of the user's namespace on targets by keys.
117
117
118 See the class docstring for information about targets and possible
118 See the class docstring for information about targets and possible
119 exceptions this method can raise.
119 exceptions this method can raise.
120
120
121 :Parameters:
121 :Parameters:
122 keys : tuple of strings
122 keys : tuple of strings
123 Sequence of keys to be pulled from user's namespace.
123 Sequence of keys to be pulled from user's namespace.
124 """
124 """
125
125
126 def push_function(namespace, targets='all'):
126 def push_function(namespace, targets='all'):
127 """"""
127 """"""
128
128
129 def pull_function(keys, targets='all'):
129 def pull_function(keys, targets='all'):
130 """"""
130 """"""
131
131
132 def get_result(i=None, targets='all'):
132 def get_result(i=None, targets='all'):
133 """Get the result for command i from targets.
133 """Get the result for command i from targets.
134
134
135 See the class docstring for information about targets and possible
135 See the class docstring for information about targets and possible
136 exceptions this method can raise.
136 exceptions this method can raise.
137
137
138 :Parameters:
138 :Parameters:
139 i : int or None
139 i : int or None
140 Command index or None to indicate most recent command.
140 Command index or None to indicate most recent command.
141 """
141 """
142
142
143 def reset(targets='all'):
143 def reset(targets='all'):
144 """Reset targets.
144 """Reset targets.
145
145
146 This clears the users namespace of the Engines, but won't cause
146 This clears the users namespace of the Engines, but won't cause
147 modules to be reloaded.
147 modules to be reloaded.
148 """
148 """
149
149
150 def keys(targets='all'):
150 def keys(targets='all'):
151 """Get variable names defined in user's namespace on targets."""
151 """Get variable names defined in user's namespace on targets."""
152
152
153 def kill(controller=False, targets='all'):
153 def kill(controller=False, targets='all'):
154 """Kill the targets Engines and possibly the controller.
154 """Kill the targets Engines and possibly the controller.
155
155
156 :Parameters:
156 :Parameters:
157 controller : boolean
157 controller : boolean
158 Should the controller be killed as well. If so all the
158 Should the controller be killed as well. If so all the
159 engines will be killed first no matter what targets is.
159 engines will be killed first no matter what targets is.
160 """
160 """
161
161
162 def push_serialized(namespace, targets='all'):
162 def push_serialized(namespace, targets='all'):
163 """Push a namespace of Serialized objects to targets.
163 """Push a namespace of Serialized objects to targets.
164
164
165 :Parameters:
165 :Parameters:
166 namespace : dict
166 namespace : dict
167 A dict whose keys are the variable names and whose values
167 A dict whose keys are the variable names and whose values
168 are serialized version of the objects.
168 are serialized version of the objects.
169 """
169 """
170
170
171 def pull_serialized(keys, targets='all'):
171 def pull_serialized(keys, targets='all'):
172 """Pull Serialized objects by keys from targets.
172 """Pull Serialized objects by keys from targets.
173
173
174 :Parameters:
174 :Parameters:
175 keys : tuple of strings
175 keys : tuple of strings
176 Sequence of variable names to pull as serialized objects.
176 Sequence of variable names to pull as serialized objects.
177 """
177 """
178
178
179 def clear_queue(targets='all'):
179 def clear_queue(targets='all'):
180 """Clear the queue of pending command for targets."""
180 """Clear the queue of pending command for targets."""
181
181
182 def queue_status(targets='all'):
182 def queue_status(targets='all'):
183 """Get the status of the queue on the targets."""
183 """Get the status of the queue on the targets."""
184
184
185 def set_properties(properties, targets='all'):
185 def set_properties(properties, targets='all'):
186 """set properties by key and value"""
186 """set properties by key and value"""
187
187
188 def get_properties(keys=None, targets='all'):
188 def get_properties(keys=None, targets='all'):
189 """get a list of properties by `keys`, if no keys specified, get all"""
189 """get a list of properties by `keys`, if no keys specified, get all"""
190
190
191 def del_properties(keys, targets='all'):
191 def del_properties(keys, targets='all'):
192 """delete properties by `keys`"""
192 """delete properties by `keys`"""
193
193
194 def has_properties(keys, targets='all'):
194 def has_properties(keys, targets='all'):
195 """get a list of bool values for whether `properties` has `keys`"""
195 """get a list of bool values for whether `properties` has `keys`"""
196
196
197 def clear_properties(targets='all'):
197 def clear_properties(targets='all'):
198 """clear the properties dict"""
198 """clear the properties dict"""
199
199
200
200
201 class IMultiEngine(IEngineMultiplexer):
201 class IMultiEngine(IEngineMultiplexer):
202 """A controller that exposes an explicit interface to all of its engines.
202 """A controller that exposes an explicit interface to all of its engines.
203
203
204 This is the primary inteface for interactive usage.
204 This is the primary inteface for interactive usage.
205 """
205 """
206
206
207 def get_ids():
207 def get_ids():
208 """Return list of currently registered ids.
208 """Return list of currently registered ids.
209
209
210 :Returns: A Deferred to a list of registered engine ids.
210 :Returns: A Deferred to a list of registered engine ids.
211 """
211 """
212
212
213
213
214
214
215 #-------------------------------------------------------------------------------
215 #-------------------------------------------------------------------------------
216 # Implementation of the core MultiEngine classes
216 # Implementation of the core MultiEngine classes
217 #-------------------------------------------------------------------------------
217 #-------------------------------------------------------------------------------
218
218
219 class MultiEngine(ControllerAdapterBase):
219 class MultiEngine(ControllerAdapterBase):
220 """The representation of a ControllerService as a IMultiEngine.
220 """The representation of a ControllerService as a IMultiEngine.
221
221
222 Although it is not implemented currently, this class would be where a
222 Although it is not implemented currently, this class would be where a
223 client/notification API is implemented. It could inherit from something
223 client/notification API is implemented. It could inherit from something
224 like results.NotifierParent and then use the notify method to send
224 like results.NotifierParent and then use the notify method to send
225 notifications.
225 notifications.
226 """
226 """
227
227
228 implements(IMultiEngine)
228 implements(IMultiEngine)
229
229
230 def __init(self, controller):
230 def __init(self, controller):
231 ControllerAdapterBase.__init__(self, controller)
231 ControllerAdapterBase.__init__(self, controller)
232
232
233 #---------------------------------------------------------------------------
233 #---------------------------------------------------------------------------
234 # Helper methods
234 # Helper methods
235 #---------------------------------------------------------------------------
235 #---------------------------------------------------------------------------
236
236
237 def engineList(self, targets):
237 def engineList(self, targets):
238 """Parse the targets argument into a list of valid engine objects.
238 """Parse the targets argument into a list of valid engine objects.
239
239
240 :Parameters:
240 :Parameters:
241 targets : int, list of ints or 'all'
241 targets : int, list of ints or 'all'
242 The targets argument to be parsed.
242 The targets argument to be parsed.
243
243
244 :Returns: List of engine objects.
244 :Returns: List of engine objects.
245
245
246 :Exception:
246 :Exception:
247 InvalidEngineID
247 InvalidEngineID
248 If targets is not valid or if an engine is not registered.
248 If targets is not valid or if an engine is not registered.
249 """
249 """
250 if isinstance(targets, int):
250 if isinstance(targets, int):
251 if targets not in self.engines.keys():
251 if targets not in self.engines.keys():
252 log.msg("Engine with id %i is not registered" % targets)
252 log.msg("Engine with id %i is not registered" % targets)
253 raise error.InvalidEngineID("Engine with id %i is not registered" % targets)
253 raise error.InvalidEngineID("Engine with id %i is not registered" % targets)
254 else:
254 else:
255 return [self.engines[targets]]
255 return [self.engines[targets]]
256 elif isinstance(targets, (list, tuple)):
256 elif isinstance(targets, (list, tuple)):
257 for id in targets:
257 for id in targets:
258 if id not in self.engines.keys():
258 if id not in self.engines.keys():
259 log.msg("Engine with id %r is not registered" % id)
259 log.msg("Engine with id %r is not registered" % id)
260 raise error.InvalidEngineID("Engine with id %r is not registered" % id)
260 raise error.InvalidEngineID("Engine with id %r is not registered" % id)
261 return map(self.engines.get, targets)
261 return map(self.engines.get, targets)
262 elif targets == 'all':
262 elif targets == 'all':
263 eList = self.engines.values()
263 eList = self.engines.values()
264 if len(eList) == 0:
264 if len(eList) == 0:
265 msg = """There are no engines registered.
265 raise error.NoEnginesRegistered("There are no engines registered. "
266 Check the logs in ~/.ipython/log if you think there should have been."""
266 "Check the logs if you think there should have been.")
267 raise error.NoEnginesRegistered(msg)
268 else:
267 else:
269 return eList
268 return eList
270 else:
269 else:
271 raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets)
270 raise error.InvalidEngineID("targets argument is not an int, list of ints or 'all': %r"%targets)
272
271
273 def _performOnEngines(self, methodName, *args, **kwargs):
272 def _performOnEngines(self, methodName, *args, **kwargs):
274 """Calls a method on engines and returns deferred to list of results.
273 """Calls a method on engines and returns deferred to list of results.
275
274
276 :Parameters:
275 :Parameters:
277 methodName : str
276 methodName : str
278 Name of the method to be called.
277 Name of the method to be called.
279 targets : int, list of ints, 'all'
278 targets : int, list of ints, 'all'
280 The targets argument to be parsed into a list of engine objects.
279 The targets argument to be parsed into a list of engine objects.
281 args
280 args
282 The positional keyword arguments to be passed to the engines.
281 The positional keyword arguments to be passed to the engines.
283 kwargs
282 kwargs
284 The keyword arguments passed to the method
283 The keyword arguments passed to the method
285
284
286 :Returns: List of deferreds to the results on each engine
285 :Returns: List of deferreds to the results on each engine
287
286
288 :Exception:
287 :Exception:
289 InvalidEngineID
288 InvalidEngineID
290 If the targets argument is bad in any way.
289 If the targets argument is bad in any way.
291 AttributeError
290 AttributeError
292 If the method doesn't exist on one of the engines.
291 If the method doesn't exist on one of the engines.
293 """
292 """
294 targets = kwargs.pop('targets')
293 targets = kwargs.pop('targets')
295 log.msg("Performing %s on %r" % (methodName, targets))
294 log.msg("Performing %s on %r" % (methodName, targets))
296 # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets))
295 # log.msg("Performing %s(%r, %r) on %r" % (methodName, args, kwargs, targets))
297 # This will and should raise if targets is not valid!
296 # This will and should raise if targets is not valid!
298 engines = self.engineList(targets)
297 engines = self.engineList(targets)
299 dList = []
298 dList = []
300 for e in engines:
299 for e in engines:
301 meth = getattr(e, methodName, None)
300 meth = getattr(e, methodName, None)
302 if meth is not None:
301 if meth is not None:
303 dList.append(meth(*args, **kwargs))
302 dList.append(meth(*args, **kwargs))
304 else:
303 else:
305 raise AttributeError("Engine %i does not have method %s" % (e.id, methodName))
304 raise AttributeError("Engine %i does not have method %s" % (e.id, methodName))
306 return dList
305 return dList
307
306
308 def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs):
307 def _performOnEnginesAndGatherBoth(self, methodName, *args, **kwargs):
309 """Called _performOnEngines and wraps result/exception into deferred."""
308 """Called _performOnEngines and wraps result/exception into deferred."""
310 try:
309 try:
311 dList = self._performOnEngines(methodName, *args, **kwargs)
310 dList = self._performOnEngines(methodName, *args, **kwargs)
312 except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered):
311 except (error.InvalidEngineID, AttributeError, KeyError, error.NoEnginesRegistered):
313 return defer.fail(failure.Failure())
312 return defer.fail(failure.Failure())
314 else:
313 else:
315 # Having fireOnOneErrback is causing problems with the determinacy
314 # Having fireOnOneErrback is causing problems with the determinacy
316 # of the system. Basically, once a single engine has errbacked, this
315 # of the system. Basically, once a single engine has errbacked, this
317 # method returns. In some cases, this will cause client to submit
316 # method returns. In some cases, this will cause client to submit
318 # another command. Because the previous command is still running
317 # another command. Because the previous command is still running
319 # on some engines, this command will be queued. When those commands
318 # on some engines, this command will be queued. When those commands
320 # then errback, the second command will raise QueueCleared. Ahhh!
319 # then errback, the second command will raise QueueCleared. Ahhh!
321 d = gatherBoth(dList,
320 d = gatherBoth(dList,
322 fireOnOneErrback=0,
321 fireOnOneErrback=0,
323 consumeErrors=1,
322 consumeErrors=1,
324 logErrors=0)
323 logErrors=0)
325 d.addCallback(error.collect_exceptions, methodName)
324 d.addCallback(error.collect_exceptions, methodName)
326 return d
325 return d
327
326
328 #---------------------------------------------------------------------------
327 #---------------------------------------------------------------------------
329 # General IMultiEngine methods
328 # General IMultiEngine methods
330 #---------------------------------------------------------------------------
329 #---------------------------------------------------------------------------
331
330
332 def get_ids(self):
331 def get_ids(self):
333 return defer.succeed(self.engines.keys())
332 return defer.succeed(self.engines.keys())
334
333
335 #---------------------------------------------------------------------------
334 #---------------------------------------------------------------------------
336 # IEngineMultiplexer methods
335 # IEngineMultiplexer methods
337 #---------------------------------------------------------------------------
336 #---------------------------------------------------------------------------
338
337
339 def execute(self, lines, targets='all'):
338 def execute(self, lines, targets='all'):
340 return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets)
339 return self._performOnEnginesAndGatherBoth('execute', lines, targets=targets)
341
340
342 def push(self, ns, targets='all'):
341 def push(self, ns, targets='all'):
343 return self._performOnEnginesAndGatherBoth('push', ns, targets=targets)
342 return self._performOnEnginesAndGatherBoth('push', ns, targets=targets)
344
343
345 def pull(self, keys, targets='all'):
344 def pull(self, keys, targets='all'):
346 return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets)
345 return self._performOnEnginesAndGatherBoth('pull', keys, targets=targets)
347
346
348 def push_function(self, ns, targets='all'):
347 def push_function(self, ns, targets='all'):
349 return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets)
348 return self._performOnEnginesAndGatherBoth('push_function', ns, targets=targets)
350
349
351 def pull_function(self, keys, targets='all'):
350 def pull_function(self, keys, targets='all'):
352 return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets)
351 return self._performOnEnginesAndGatherBoth('pull_function', keys, targets=targets)
353
352
354 def get_result(self, i=None, targets='all'):
353 def get_result(self, i=None, targets='all'):
355 return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets)
354 return self._performOnEnginesAndGatherBoth('get_result', i, targets=targets)
356
355
357 def reset(self, targets='all'):
356 def reset(self, targets='all'):
358 return self._performOnEnginesAndGatherBoth('reset', targets=targets)
357 return self._performOnEnginesAndGatherBoth('reset', targets=targets)
359
358
360 def keys(self, targets='all'):
359 def keys(self, targets='all'):
361 return self._performOnEnginesAndGatherBoth('keys', targets=targets)
360 return self._performOnEnginesAndGatherBoth('keys', targets=targets)
362
361
363 def kill(self, controller=False, targets='all'):
362 def kill(self, controller=False, targets='all'):
364 if controller:
363 if controller:
365 targets = 'all'
364 targets = 'all'
366 d = self._performOnEnginesAndGatherBoth('kill', targets=targets)
365 d = self._performOnEnginesAndGatherBoth('kill', targets=targets)
367 if controller:
366 if controller:
368 log.msg("Killing controller")
367 log.msg("Killing controller")
369 d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop))
368 d.addCallback(lambda _: reactor.callLater(2.0, reactor.stop))
370 # Consume any weird stuff coming back
369 # Consume any weird stuff coming back
371 d.addBoth(lambda _: None)
370 d.addBoth(lambda _: None)
372 return d
371 return d
373
372
374 def push_serialized(self, namespace, targets='all'):
373 def push_serialized(self, namespace, targets='all'):
375 for k, v in namespace.iteritems():
374 for k, v in namespace.iteritems():
376 log.msg("Pushed object %s is %f MB" % (k, v.getDataSize()))
375 log.msg("Pushed object %s is %f MB" % (k, v.getDataSize()))
377 d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets)
376 d = self._performOnEnginesAndGatherBoth('push_serialized', namespace, targets=targets)
378 return d
377 return d
379
378
380 def pull_serialized(self, keys, targets='all'):
379 def pull_serialized(self, keys, targets='all'):
381 try:
380 try:
382 dList = self._performOnEngines('pull_serialized', keys, targets=targets)
381 dList = self._performOnEngines('pull_serialized', keys, targets=targets)
383 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
382 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
384 return defer.fail(failure.Failure())
383 return defer.fail(failure.Failure())
385 else:
384 else:
386 for d in dList:
385 for d in dList:
387 d.addCallback(self._logSizes)
386 d.addCallback(self._logSizes)
388 d = gatherBoth(dList,
387 d = gatherBoth(dList,
389 fireOnOneErrback=0,
388 fireOnOneErrback=0,
390 consumeErrors=1,
389 consumeErrors=1,
391 logErrors=0)
390 logErrors=0)
392 d.addCallback(error.collect_exceptions, 'pull_serialized')
391 d.addCallback(error.collect_exceptions, 'pull_serialized')
393 return d
392 return d
394
393
395 def _logSizes(self, listOfSerialized):
394 def _logSizes(self, listOfSerialized):
396 if isinstance(listOfSerialized, (list, tuple)):
395 if isinstance(listOfSerialized, (list, tuple)):
397 for s in listOfSerialized:
396 for s in listOfSerialized:
398 log.msg("Pulled object is %f MB" % s.getDataSize())
397 log.msg("Pulled object is %f MB" % s.getDataSize())
399 else:
398 else:
400 log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize())
399 log.msg("Pulled object is %f MB" % listOfSerialized.getDataSize())
401 return listOfSerialized
400 return listOfSerialized
402
401
403 def clear_queue(self, targets='all'):
402 def clear_queue(self, targets='all'):
404 return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets)
403 return self._performOnEnginesAndGatherBoth('clear_queue', targets=targets)
405
404
406 def queue_status(self, targets='all'):
405 def queue_status(self, targets='all'):
407 log.msg("Getting queue status on %r" % targets)
406 log.msg("Getting queue status on %r" % targets)
408 try:
407 try:
409 engines = self.engineList(targets)
408 engines = self.engineList(targets)
410 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
409 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
411 return defer.fail(failure.Failure())
410 return defer.fail(failure.Failure())
412 else:
411 else:
413 dList = []
412 dList = []
414 for e in engines:
413 for e in engines:
415 dList.append(e.queue_status().addCallback(lambda s:(e.id, s)))
414 dList.append(e.queue_status().addCallback(lambda s:(e.id, s)))
416 d = gatherBoth(dList,
415 d = gatherBoth(dList,
417 fireOnOneErrback=0,
416 fireOnOneErrback=0,
418 consumeErrors=1,
417 consumeErrors=1,
419 logErrors=0)
418 logErrors=0)
420 d.addCallback(error.collect_exceptions, 'queue_status')
419 d.addCallback(error.collect_exceptions, 'queue_status')
421 return d
420 return d
422
421
423 def get_properties(self, keys=None, targets='all'):
422 def get_properties(self, keys=None, targets='all'):
424 log.msg("Getting properties on %r" % targets)
423 log.msg("Getting properties on %r" % targets)
425 try:
424 try:
426 engines = self.engineList(targets)
425 engines = self.engineList(targets)
427 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
426 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
428 return defer.fail(failure.Failure())
427 return defer.fail(failure.Failure())
429 else:
428 else:
430 dList = [e.get_properties(keys) for e in engines]
429 dList = [e.get_properties(keys) for e in engines]
431 d = gatherBoth(dList,
430 d = gatherBoth(dList,
432 fireOnOneErrback=0,
431 fireOnOneErrback=0,
433 consumeErrors=1,
432 consumeErrors=1,
434 logErrors=0)
433 logErrors=0)
435 d.addCallback(error.collect_exceptions, 'get_properties')
434 d.addCallback(error.collect_exceptions, 'get_properties')
436 return d
435 return d
437
436
438 def set_properties(self, properties, targets='all'):
437 def set_properties(self, properties, targets='all'):
439 log.msg("Setting properties on %r" % targets)
438 log.msg("Setting properties on %r" % targets)
440 try:
439 try:
441 engines = self.engineList(targets)
440 engines = self.engineList(targets)
442 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
441 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
443 return defer.fail(failure.Failure())
442 return defer.fail(failure.Failure())
444 else:
443 else:
445 dList = [e.set_properties(properties) for e in engines]
444 dList = [e.set_properties(properties) for e in engines]
446 d = gatherBoth(dList,
445 d = gatherBoth(dList,
447 fireOnOneErrback=0,
446 fireOnOneErrback=0,
448 consumeErrors=1,
447 consumeErrors=1,
449 logErrors=0)
448 logErrors=0)
450 d.addCallback(error.collect_exceptions, 'set_properties')
449 d.addCallback(error.collect_exceptions, 'set_properties')
451 return d
450 return d
452
451
453 def has_properties(self, keys, targets='all'):
452 def has_properties(self, keys, targets='all'):
454 log.msg("Checking properties on %r" % targets)
453 log.msg("Checking properties on %r" % targets)
455 try:
454 try:
456 engines = self.engineList(targets)
455 engines = self.engineList(targets)
457 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
456 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
458 return defer.fail(failure.Failure())
457 return defer.fail(failure.Failure())
459 else:
458 else:
460 dList = [e.has_properties(keys) for e in engines]
459 dList = [e.has_properties(keys) for e in engines]
461 d = gatherBoth(dList,
460 d = gatherBoth(dList,
462 fireOnOneErrback=0,
461 fireOnOneErrback=0,
463 consumeErrors=1,
462 consumeErrors=1,
464 logErrors=0)
463 logErrors=0)
465 d.addCallback(error.collect_exceptions, 'has_properties')
464 d.addCallback(error.collect_exceptions, 'has_properties')
466 return d
465 return d
467
466
468 def del_properties(self, keys, targets='all'):
467 def del_properties(self, keys, targets='all'):
469 log.msg("Deleting properties on %r" % targets)
468 log.msg("Deleting properties on %r" % targets)
470 try:
469 try:
471 engines = self.engineList(targets)
470 engines = self.engineList(targets)
472 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
471 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
473 return defer.fail(failure.Failure())
472 return defer.fail(failure.Failure())
474 else:
473 else:
475 dList = [e.del_properties(keys) for e in engines]
474 dList = [e.del_properties(keys) for e in engines]
476 d = gatherBoth(dList,
475 d = gatherBoth(dList,
477 fireOnOneErrback=0,
476 fireOnOneErrback=0,
478 consumeErrors=1,
477 consumeErrors=1,
479 logErrors=0)
478 logErrors=0)
480 d.addCallback(error.collect_exceptions, 'del_properties')
479 d.addCallback(error.collect_exceptions, 'del_properties')
481 return d
480 return d
482
481
483 def clear_properties(self, targets='all'):
482 def clear_properties(self, targets='all'):
484 log.msg("Clearing properties on %r" % targets)
483 log.msg("Clearing properties on %r" % targets)
485 try:
484 try:
486 engines = self.engineList(targets)
485 engines = self.engineList(targets)
487 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
486 except (error.InvalidEngineID, AttributeError, error.NoEnginesRegistered):
488 return defer.fail(failure.Failure())
487 return defer.fail(failure.Failure())
489 else:
488 else:
490 dList = [e.clear_properties() for e in engines]
489 dList = [e.clear_properties() for e in engines]
491 d = gatherBoth(dList,
490 d = gatherBoth(dList,
492 fireOnOneErrback=0,
491 fireOnOneErrback=0,
493 consumeErrors=1,
492 consumeErrors=1,
494 logErrors=0)
493 logErrors=0)
495 d.addCallback(error.collect_exceptions, 'clear_properties')
494 d.addCallback(error.collect_exceptions, 'clear_properties')
496 return d
495 return d
497
496
498
497
499 components.registerAdapter(MultiEngine,
498 components.registerAdapter(MultiEngine,
500 IControllerBase,
499 IControllerBase,
501 IMultiEngine)
500 IMultiEngine)
502
501
503
502
504 #-------------------------------------------------------------------------------
503 #-------------------------------------------------------------------------------
505 # Interfaces for the Synchronous MultiEngine
504 # Interfaces for the Synchronous MultiEngine
506 #-------------------------------------------------------------------------------
505 #-------------------------------------------------------------------------------
507
506
508 class ISynchronousEngineMultiplexer(Interface):
507 class ISynchronousEngineMultiplexer(Interface):
509 pass
508 pass
510
509
511
510
512 class ISynchronousMultiEngine(ISynchronousEngineMultiplexer):
511 class ISynchronousMultiEngine(ISynchronousEngineMultiplexer):
513 """Synchronous, two-phase version of IMultiEngine.
512 """Synchronous, two-phase version of IMultiEngine.
514
513
515 Methods in this interface are identical to those of IMultiEngine, but they
514 Methods in this interface are identical to those of IMultiEngine, but they
516 take one additional argument:
515 take one additional argument:
517
516
518 execute(lines, targets='all') -> execute(lines, targets='all, block=True)
517 execute(lines, targets='all') -> execute(lines, targets='all, block=True)
519
518
520 :Parameters:
519 :Parameters:
521 block : boolean
520 block : boolean
522 Should the method return a deferred to a deferredID or the
521 Should the method return a deferred to a deferredID or the
523 actual result. If block=False a deferred to a deferredID is
522 actual result. If block=False a deferred to a deferredID is
524 returned and the user must call `get_pending_deferred` at a later
523 returned and the user must call `get_pending_deferred` at a later
525 point. If block=True, a deferred to the actual result comes back.
524 point. If block=True, a deferred to the actual result comes back.
526 """
525 """
527 def get_pending_deferred(deferredID, block=True):
526 def get_pending_deferred(deferredID, block=True):
528 """"""
527 """"""
529
528
530 def clear_pending_deferreds():
529 def clear_pending_deferreds():
531 """"""
530 """"""
532
531
533
532
534 #-------------------------------------------------------------------------------
533 #-------------------------------------------------------------------------------
535 # Implementation of the Synchronous MultiEngine
534 # Implementation of the Synchronous MultiEngine
536 #-------------------------------------------------------------------------------
535 #-------------------------------------------------------------------------------
537
536
538 class SynchronousMultiEngine(PendingDeferredManager):
537 class SynchronousMultiEngine(PendingDeferredManager):
539 """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine`
538 """Adapt an `IMultiEngine` -> `ISynchronousMultiEngine`
540
539
541 Warning, this class uses a decorator that currently uses **kwargs.
540 Warning, this class uses a decorator that currently uses **kwargs.
542 Because of this block must be passed as a kwarg, not positionally.
541 Because of this block must be passed as a kwarg, not positionally.
543 """
542 """
544
543
545 implements(ISynchronousMultiEngine)
544 implements(ISynchronousMultiEngine)
546
545
547 def __init__(self, multiengine):
546 def __init__(self, multiengine):
548 self.multiengine = multiengine
547 self.multiengine = multiengine
549 PendingDeferredManager.__init__(self)
548 PendingDeferredManager.__init__(self)
550
549
551 #---------------------------------------------------------------------------
550 #---------------------------------------------------------------------------
552 # Decorated pending deferred methods
551 # Decorated pending deferred methods
553 #---------------------------------------------------------------------------
552 #---------------------------------------------------------------------------
554
553
555 @two_phase
554 @two_phase
556 def execute(self, lines, targets='all'):
555 def execute(self, lines, targets='all'):
557 d = self.multiengine.execute(lines, targets)
556 d = self.multiengine.execute(lines, targets)
558 return d
557 return d
559
558
560 @two_phase
559 @two_phase
561 def push(self, namespace, targets='all'):
560 def push(self, namespace, targets='all'):
562 return self.multiengine.push(namespace, targets)
561 return self.multiengine.push(namespace, targets)
563
562
564 @two_phase
563 @two_phase
565 def pull(self, keys, targets='all'):
564 def pull(self, keys, targets='all'):
566 d = self.multiengine.pull(keys, targets)
565 d = self.multiengine.pull(keys, targets)
567 return d
566 return d
568
567
569 @two_phase
568 @two_phase
570 def push_function(self, namespace, targets='all'):
569 def push_function(self, namespace, targets='all'):
571 return self.multiengine.push_function(namespace, targets)
570 return self.multiengine.push_function(namespace, targets)
572
571
573 @two_phase
572 @two_phase
574 def pull_function(self, keys, targets='all'):
573 def pull_function(self, keys, targets='all'):
575 d = self.multiengine.pull_function(keys, targets)
574 d = self.multiengine.pull_function(keys, targets)
576 return d
575 return d
577
576
578 @two_phase
577 @two_phase
579 def get_result(self, i=None, targets='all'):
578 def get_result(self, i=None, targets='all'):
580 return self.multiengine.get_result(i, targets='all')
579 return self.multiengine.get_result(i, targets='all')
581
580
582 @two_phase
581 @two_phase
583 def reset(self, targets='all'):
582 def reset(self, targets='all'):
584 return self.multiengine.reset(targets)
583 return self.multiengine.reset(targets)
585
584
586 @two_phase
585 @two_phase
587 def keys(self, targets='all'):
586 def keys(self, targets='all'):
588 return self.multiengine.keys(targets)
587 return self.multiengine.keys(targets)
589
588
590 @two_phase
589 @two_phase
591 def kill(self, controller=False, targets='all'):
590 def kill(self, controller=False, targets='all'):
592 return self.multiengine.kill(controller, targets)
591 return self.multiengine.kill(controller, targets)
593
592
594 @two_phase
593 @two_phase
595 def push_serialized(self, namespace, targets='all'):
594 def push_serialized(self, namespace, targets='all'):
596 return self.multiengine.push_serialized(namespace, targets)
595 return self.multiengine.push_serialized(namespace, targets)
597
596
598 @two_phase
597 @two_phase
599 def pull_serialized(self, keys, targets='all'):
598 def pull_serialized(self, keys, targets='all'):
600 return self.multiengine.pull_serialized(keys, targets)
599 return self.multiengine.pull_serialized(keys, targets)
601
600
602 @two_phase
601 @two_phase
603 def clear_queue(self, targets='all'):
602 def clear_queue(self, targets='all'):
604 return self.multiengine.clear_queue(targets)
603 return self.multiengine.clear_queue(targets)
605
604
606 @two_phase
605 @two_phase
607 def queue_status(self, targets='all'):
606 def queue_status(self, targets='all'):
608 return self.multiengine.queue_status(targets)
607 return self.multiengine.queue_status(targets)
609
608
610 @two_phase
609 @two_phase
611 def set_properties(self, properties, targets='all'):
610 def set_properties(self, properties, targets='all'):
612 return self.multiengine.set_properties(properties, targets)
611 return self.multiengine.set_properties(properties, targets)
613
612
614 @two_phase
613 @two_phase
615 def get_properties(self, keys=None, targets='all'):
614 def get_properties(self, keys=None, targets='all'):
616 return self.multiengine.get_properties(keys, targets)
615 return self.multiengine.get_properties(keys, targets)
617
616
618 @two_phase
617 @two_phase
619 def has_properties(self, keys, targets='all'):
618 def has_properties(self, keys, targets='all'):
620 return self.multiengine.has_properties(keys, targets)
619 return self.multiengine.has_properties(keys, targets)
621
620
622 @two_phase
621 @two_phase
623 def del_properties(self, keys, targets='all'):
622 def del_properties(self, keys, targets='all'):
624 return self.multiengine.del_properties(keys, targets)
623 return self.multiengine.del_properties(keys, targets)
625
624
626 @two_phase
625 @two_phase
627 def clear_properties(self, targets='all'):
626 def clear_properties(self, targets='all'):
628 return self.multiengine.clear_properties(targets)
627 return self.multiengine.clear_properties(targets)
629
628
630 #---------------------------------------------------------------------------
629 #---------------------------------------------------------------------------
631 # IMultiEngine methods
630 # IMultiEngine methods
632 #---------------------------------------------------------------------------
631 #---------------------------------------------------------------------------
633
632
634 def get_ids(self):
633 def get_ids(self):
635 """Return a list of registered engine ids.
634 """Return a list of registered engine ids.
636
635
637 Never use the two phase block/non-block stuff for this.
636 Never use the two phase block/non-block stuff for this.
638 """
637 """
639 return self.multiengine.get_ids()
638 return self.multiengine.get_ids()
640
639
641
640
642 components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine)
641 components.registerAdapter(SynchronousMultiEngine, IMultiEngine, ISynchronousMultiEngine)
643
642
644
643
645 #-------------------------------------------------------------------------------
644 #-------------------------------------------------------------------------------
646 # Various high-level interfaces that can be used as MultiEngine mix-ins
645 # Various high-level interfaces that can be used as MultiEngine mix-ins
647 #-------------------------------------------------------------------------------
646 #-------------------------------------------------------------------------------
648
647
649 #-------------------------------------------------------------------------------
648 #-------------------------------------------------------------------------------
650 # IMultiEngineCoordinator
649 # IMultiEngineCoordinator
651 #-------------------------------------------------------------------------------
650 #-------------------------------------------------------------------------------
652
651
653 class IMultiEngineCoordinator(Interface):
652 class IMultiEngineCoordinator(Interface):
654 """Methods that work on multiple engines explicitly."""
653 """Methods that work on multiple engines explicitly."""
655
654
656 def scatter(key, seq, dist='b', flatten=False, targets='all'):
655 def scatter(key, seq, dist='b', flatten=False, targets='all'):
657 """Partition and distribute a sequence to targets."""
656 """Partition and distribute a sequence to targets."""
658
657
659 def gather(key, dist='b', targets='all'):
658 def gather(key, dist='b', targets='all'):
660 """Gather object key from targets."""
659 """Gather object key from targets."""
661
660
662 def raw_map(func, seqs, dist='b', targets='all'):
661 def raw_map(func, seqs, dist='b', targets='all'):
663 """
662 """
664 A parallelized version of Python's builtin `map` function.
663 A parallelized version of Python's builtin `map` function.
665
664
666 This has a slightly different syntax than the builtin `map`.
665 This has a slightly different syntax than the builtin `map`.
667 This is needed because we need to have keyword arguments and thus
666 This is needed because we need to have keyword arguments and thus
668 can't use *args to capture all the sequences. Instead, they must
667 can't use *args to capture all the sequences. Instead, they must
669 be passed in a list or tuple.
668 be passed in a list or tuple.
670
669
671 The equivalence is:
670 The equivalence is:
672
671
673 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
672 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
674
673
675 Most users will want to use parallel functions or the `mapper`
674 Most users will want to use parallel functions or the `mapper`
676 and `map` methods for an API that follows that of the builtin
675 and `map` methods for an API that follows that of the builtin
677 `map`.
676 `map`.
678 """
677 """
679
678
680
679
681 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
680 class ISynchronousMultiEngineCoordinator(IMultiEngineCoordinator):
682 """Methods that work on multiple engines explicitly."""
681 """Methods that work on multiple engines explicitly."""
683
682
684 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
683 def scatter(key, seq, dist='b', flatten=False, targets='all', block=True):
685 """Partition and distribute a sequence to targets."""
684 """Partition and distribute a sequence to targets."""
686
685
687 def gather(key, dist='b', targets='all', block=True):
686 def gather(key, dist='b', targets='all', block=True):
688 """Gather object key from targets"""
687 """Gather object key from targets"""
689
688
690 def raw_map(func, seqs, dist='b', targets='all', block=True):
689 def raw_map(func, seqs, dist='b', targets='all', block=True):
691 """
690 """
692 A parallelized version of Python's builtin map.
691 A parallelized version of Python's builtin map.
693
692
694 This has a slightly different syntax than the builtin `map`.
693 This has a slightly different syntax than the builtin `map`.
695 This is needed because we need to have keyword arguments and thus
694 This is needed because we need to have keyword arguments and thus
696 can't use *args to capture all the sequences. Instead, they must
695 can't use *args to capture all the sequences. Instead, they must
697 be passed in a list or tuple.
696 be passed in a list or tuple.
698
697
699 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
698 raw_map(func, seqs) -> map(func, seqs[0], seqs[1], ...)
700
699
701 Most users will want to use parallel functions or the `mapper`
700 Most users will want to use parallel functions or the `mapper`
702 and `map` methods for an API that follows that of the builtin
701 and `map` methods for an API that follows that of the builtin
703 `map`.
702 `map`.
704 """
703 """
705
704
706
705
707 #-------------------------------------------------------------------------------
706 #-------------------------------------------------------------------------------
708 # IMultiEngineExtras
707 # IMultiEngineExtras
709 #-------------------------------------------------------------------------------
708 #-------------------------------------------------------------------------------
710
709
711 class IMultiEngineExtras(Interface):
710 class IMultiEngineExtras(Interface):
712
711
713 def zip_pull(targets, keys):
712 def zip_pull(targets, keys):
714 """
713 """
715 Pull, but return results in a different format from `pull`.
714 Pull, but return results in a different format from `pull`.
716
715
717 This method basically returns zip(pull(targets, *keys)), with a few
716 This method basically returns zip(pull(targets, *keys)), with a few
718 edge cases handled differently. Users of chainsaw will find this format
717 edge cases handled differently. Users of chainsaw will find this format
719 familiar.
718 familiar.
720 """
719 """
721
720
722 def run(targets, fname):
721 def run(targets, fname):
723 """Run a .py file on targets."""
722 """Run a .py file on targets."""
724
723
725
724
726 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
725 class ISynchronousMultiEngineExtras(IMultiEngineExtras):
727 def zip_pull(targets, keys, block=True):
726 def zip_pull(targets, keys, block=True):
728 """
727 """
729 Pull, but return results in a different format from `pull`.
728 Pull, but return results in a different format from `pull`.
730
729
731 This method basically returns zip(pull(targets, *keys)), with a few
730 This method basically returns zip(pull(targets, *keys)), with a few
732 edge cases handled differently. Users of chainsaw will find this format
731 edge cases handled differently. Users of chainsaw will find this format
733 familiar.
732 familiar.
734 """
733 """
735
734
736 def run(targets, fname, block=True):
735 def run(targets, fname, block=True):
737 """Run a .py file on targets."""
736 """Run a .py file on targets."""
738
737
739 #-------------------------------------------------------------------------------
738 #-------------------------------------------------------------------------------
740 # The full MultiEngine interface
739 # The full MultiEngine interface
741 #-------------------------------------------------------------------------------
740 #-------------------------------------------------------------------------------
742
741
743 class IFullMultiEngine(IMultiEngine,
742 class IFullMultiEngine(IMultiEngine,
744 IMultiEngineCoordinator,
743 IMultiEngineCoordinator,
745 IMultiEngineExtras):
744 IMultiEngineExtras):
746 pass
745 pass
747
746
748
747
749 class IFullSynchronousMultiEngine(ISynchronousMultiEngine,
748 class IFullSynchronousMultiEngine(ISynchronousMultiEngine,
750 ISynchronousMultiEngineCoordinator,
749 ISynchronousMultiEngineCoordinator,
751 ISynchronousMultiEngineExtras):
750 ISynchronousMultiEngineExtras):
752 pass
751 pass
753
752
General Comments 0
You need to be logged in to leave comments. Login now