##// END OF EJS Templates
Added a log retrieval interface to Cluster.
Brian Granger -
Show More
@@ -1,584 +1,655 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Facilities for handling client connections to the controller."""
4 """Facilities for handling client connections to the controller."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 from __future__ import with_statement
17 import os
18 import os
18
19
19 from IPython.kernel.fcutil import Tub, find_furl
20 from IPython.kernel.fcutil import Tub, find_furl
20 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
21 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
21 from IPython.kernel.launcher import IPClusterLauncher
22 from IPython.kernel.launcher import IPClusterLauncher
22 from IPython.kernel.twistedutil import gatherBoth, make_deferred
23 from IPython.kernel.twistedutil import gatherBoth, make_deferred
23 from IPython.kernel.twistedutil import blockingCallFromThread
24 from IPython.kernel.twistedutil import blockingCallFromThread
24
25
25 from IPython.utils.importstring import import_item
26 from IPython.utils.importstring import import_item
26 from IPython.utils.genutils import get_ipython_dir
27 from IPython.utils.genutils import get_ipython_dir
27
28
28 from twisted.internet import defer
29 from twisted.internet import defer
29 from twisted.python import failure
30 from twisted.python import failure
30
31
31 #-----------------------------------------------------------------------------
32 #-----------------------------------------------------------------------------
32 # The ClientConnector class
33 # The ClientConnector class
33 #-----------------------------------------------------------------------------
34 #-----------------------------------------------------------------------------
34
35
35
36
36 class AsyncClientConnector(object):
37 class AsyncClientConnector(object):
37 """A class for getting remote references and clients from furls.
38 """A class for getting remote references and clients from furls.
38
39
39 This start a single :class:`Tub` for all remote reference and caches
40 This start a single :class:`Tub` for all remote reference and caches
40 references.
41 references.
41 """
42 """
42
43
43 def __init__(self):
44 def __init__(self):
44 self._remote_refs = {}
45 self._remote_refs = {}
45 self.tub = Tub()
46 self.tub = Tub()
46 self.tub.startService()
47 self.tub.startService()
47
48
48 def _find_furl(self, profile='default', cluster_dir=None,
49 def _find_furl(self, profile='default', cluster_dir=None,
49 furl_or_file=None, furl_file_name=None,
50 furl_or_file=None, furl_file_name=None,
50 ipythondir=None):
51 ipythondir=None):
51 """Find a FURL file by profile+ipythondir or cluster dir.
52 """Find a FURL file by profile+ipythondir or cluster dir.
52
53
53 This raises an exception if a FURL file can't be found.
54 This raises an exception if a FURL file can't be found.
54 """
55 """
55 # Try by furl_or_file
56 # Try by furl_or_file
56 if furl_or_file is not None:
57 if furl_or_file is not None:
57 try:
58 try:
58 furl = find_furl(furl_or_file)
59 furl = find_furl(furl_or_file)
59 except ValueError:
60 except ValueError:
60 return furl
61 return furl
61
62
62 if furl_file_name is None:
63 if furl_file_name is None:
63 raise ValueError('A furl_file_name must be provided')
64 raise ValueError('A furl_file_name must be provided')
64
65
65 # Try by cluster_dir
66 # Try by cluster_dir
66 if cluster_dir is not None:
67 if cluster_dir is not None:
67 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
68 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
68 sdir = cluster_dir_obj.security_dir
69 sdir = cluster_dir_obj.security_dir
69 furl_file = os.path.join(sdir, furl_file_name)
70 furl_file = os.path.join(sdir, furl_file_name)
70 return find_furl(furl_file)
71 return find_furl(furl_file)
71
72
72 # Try by profile
73 # Try by profile
73 if ipythondir is None:
74 if ipythondir is None:
74 ipythondir = get_ipython_dir()
75 ipythondir = get_ipython_dir()
75 if profile is not None:
76 if profile is not None:
76 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
77 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
77 ipythondir, profile)
78 ipythondir, profile)
78 sdir = cluster_dir_obj.security_dir
79 sdir = cluster_dir_obj.security_dir
79 furl_file = os.path.join(sdir, furl_file_name)
80 furl_file = os.path.join(sdir, furl_file_name)
80 return find_furl(furl_file)
81 return find_furl(furl_file)
81
82
82 raise ValueError('Could not find a valid FURL file.')
83 raise ValueError('Could not find a valid FURL file.')
83
84
84 def get_reference(self, furl_or_file):
85 def get_reference(self, furl_or_file):
85 """Get a remote reference using a furl or a file containing a furl.
86 """Get a remote reference using a furl or a file containing a furl.
86
87
87 Remote references are cached locally so once a remote reference
88 Remote references are cached locally so once a remote reference
88 has been retrieved for a given furl, the cached version is
89 has been retrieved for a given furl, the cached version is
89 returned.
90 returned.
90
91
91 Parameters
92 Parameters
92 ----------
93 ----------
93 furl_or_file : str
94 furl_or_file : str
94 A furl or a filename containing a furl
95 A furl or a filename containing a furl
95
96
96 Returns
97 Returns
97 -------
98 -------
98 A deferred to a remote reference
99 A deferred to a remote reference
99 """
100 """
100 furl = find_furl(furl_or_file)
101 furl = find_furl(furl_or_file)
101 if furl in self._remote_refs:
102 if furl in self._remote_refs:
102 d = defer.succeed(self._remote_refs[furl])
103 d = defer.succeed(self._remote_refs[furl])
103 else:
104 else:
104 d = self.tub.getReference(furl)
105 d = self.tub.getReference(furl)
105 d.addCallback(self._save_ref, furl)
106 d.addCallback(self._save_ref, furl)
106 return d
107 return d
107
108
108 def _save_ref(self, ref, furl):
109 def _save_ref(self, ref, furl):
109 """Cache a remote reference by its furl."""
110 """Cache a remote reference by its furl."""
110 self._remote_refs[furl] = ref
111 self._remote_refs[furl] = ref
111 return ref
112 return ref
112
113
113 def get_task_client(self, profile='default', cluster_dir=None,
114 def get_task_client(self, profile='default', cluster_dir=None,
114 furl_or_file=None, ipythondir=None):
115 furl_or_file=None, ipythondir=None):
115 """Get the task controller client.
116 """Get the task controller client.
116
117
117 This method is a simple wrapper around `get_client` that passes in
118 This method is a simple wrapper around `get_client` that passes in
118 the default name of the task client FURL file. Usually only
119 the default name of the task client FURL file. Usually only
119 the ``profile`` option will be needed. If a FURL file can't be
120 the ``profile`` option will be needed. If a FURL file can't be
120 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
121 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
121
122
122 Parameters
123 Parameters
123 ----------
124 ----------
124 profile : str
125 profile : str
125 The name of a cluster directory profile (default="default"). The
126 The name of a cluster directory profile (default="default"). The
126 cluster directory "cluster_<profile>" will be searched for
127 cluster directory "cluster_<profile>" will be searched for
127 in ``os.getcwd()``, the ipythondir and then in the directories
128 in ``os.getcwd()``, the ipythondir and then in the directories
128 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
129 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
129 cluster_dir : str
130 cluster_dir : str
130 The full path to a cluster directory. This is useful if profiles
131 The full path to a cluster directory. This is useful if profiles
131 are not being used.
132 are not being used.
132 furl_or_file : str
133 furl_or_file : str
133 A furl or a filename containing a FURLK. This is useful if you
134 A furl or a filename containing a FURLK. This is useful if you
134 simply know the location of the FURL file.
135 simply know the location of the FURL file.
135 ipythondir : str
136 ipythondir : str
136 The location of the ipythondir if different from the default.
137 The location of the ipythondir if different from the default.
137 This is used if the cluster directory is being found by profile.
138 This is used if the cluster directory is being found by profile.
138
139
139 Returns
140 Returns
140 -------
141 -------
141 A deferred to the actual client class.
142 A deferred to the actual client class.
142 """
143 """
143 return self.get_client(
144 return self.get_client(
144 profile, cluster_dir, furl_or_file,
145 profile, cluster_dir, furl_or_file,
145 'ipcontroller-tc.furl', ipythondir
146 'ipcontroller-tc.furl', ipythondir
146 )
147 )
147
148
148 def get_multiengine_client(self, profile='default', cluster_dir=None,
149 def get_multiengine_client(self, profile='default', cluster_dir=None,
149 furl_or_file=None, ipythondir=None):
150 furl_or_file=None, ipythondir=None):
150 """Get the multiengine controller client.
151 """Get the multiengine controller client.
151
152
152 This method is a simple wrapper around `get_client` that passes in
153 This method is a simple wrapper around `get_client` that passes in
153 the default name of the task client FURL file. Usually only
154 the default name of the task client FURL file. Usually only
154 the ``profile`` option will be needed. If a FURL file can't be
155 the ``profile`` option will be needed. If a FURL file can't be
155 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
156 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
156
157
157 Parameters
158 Parameters
158 ----------
159 ----------
159 profile : str
160 profile : str
160 The name of a cluster directory profile (default="default"). The
161 The name of a cluster directory profile (default="default"). The
161 cluster directory "cluster_<profile>" will be searched for
162 cluster directory "cluster_<profile>" will be searched for
162 in ``os.getcwd()``, the ipythondir and then in the directories
163 in ``os.getcwd()``, the ipythondir and then in the directories
163 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
164 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
164 cluster_dir : str
165 cluster_dir : str
165 The full path to a cluster directory. This is useful if profiles
166 The full path to a cluster directory. This is useful if profiles
166 are not being used.
167 are not being used.
167 furl_or_file : str
168 furl_or_file : str
168 A furl or a filename containing a FURLK. This is useful if you
169 A furl or a filename containing a FURLK. This is useful if you
169 simply know the location of the FURL file.
170 simply know the location of the FURL file.
170 ipythondir : str
171 ipythondir : str
171 The location of the ipythondir if different from the default.
172 The location of the ipythondir if different from the default.
172 This is used if the cluster directory is being found by profile.
173 This is used if the cluster directory is being found by profile.
173
174
174 Returns
175 Returns
175 -------
176 -------
176 A deferred to the actual client class.
177 A deferred to the actual client class.
177 """
178 """
178 return self.get_client(
179 return self.get_client(
179 profile, cluster_dir, furl_or_file,
180 profile, cluster_dir, furl_or_file,
180 'ipcontroller-mec.furl', ipythondir
181 'ipcontroller-mec.furl', ipythondir
181 )
182 )
182
183
183 def get_client(self, profile='default', cluster_dir=None,
184 def get_client(self, profile='default', cluster_dir=None,
184 furl_or_file=None, furl_file_name=None, ipythondir=None):
185 furl_or_file=None, furl_file_name=None, ipythondir=None):
185 """Get a remote reference and wrap it in a client by furl.
186 """Get a remote reference and wrap it in a client by furl.
186
187
187 This method is a simple wrapper around `get_client` that passes in
188 This method is a simple wrapper around `get_client` that passes in
188 the default name of the task client FURL file. Usually only
189 the default name of the task client FURL file. Usually only
189 the ``profile`` option will be needed. If a FURL file can't be
190 the ``profile`` option will be needed. If a FURL file can't be
190 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
191 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
191
192
192 Parameters
193 Parameters
193 ----------
194 ----------
194 profile : str
195 profile : str
195 The name of a cluster directory profile (default="default"). The
196 The name of a cluster directory profile (default="default"). The
196 cluster directory "cluster_<profile>" will be searched for
197 cluster directory "cluster_<profile>" will be searched for
197 in ``os.getcwd()``, the ipythondir and then in the directories
198 in ``os.getcwd()``, the ipythondir and then in the directories
198 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
199 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
199 cluster_dir : str
200 cluster_dir : str
200 The full path to a cluster directory. This is useful if profiles
201 The full path to a cluster directory. This is useful if profiles
201 are not being used.
202 are not being used.
202 furl_or_file : str
203 furl_or_file : str
203 A furl or a filename containing a FURLK. This is useful if you
204 A furl or a filename containing a FURLK. This is useful if you
204 simply know the location of the FURL file.
205 simply know the location of the FURL file.
205 furl_file_name : str
206 furl_file_name : str
206 The filename (not the full path) of the FURL. This must be
207 The filename (not the full path) of the FURL. This must be
207 provided if ``furl_or_file`` is not.
208 provided if ``furl_or_file`` is not.
208 ipythondir : str
209 ipythondir : str
209 The location of the ipythondir if different from the default.
210 The location of the ipythondir if different from the default.
210 This is used if the cluster directory is being found by profile.
211 This is used if the cluster directory is being found by profile.
211
212
212 Returns
213 Returns
213 -------
214 -------
214 A deferred to the actual client class.
215 A deferred to the actual client class.
215 """
216 """
216 try:
217 try:
217 furl = self._find_furl(
218 furl = self._find_furl(
218 profile, cluster_dir, furl_or_file,
219 profile, cluster_dir, furl_or_file,
219 furl_file_name, ipythondir
220 furl_file_name, ipythondir
220 )
221 )
221 except:
222 except:
222 return defer.fail(failure.Failure())
223 return defer.fail(failure.Failure())
223
224
224 d = self.get_reference(furl)
225 d = self.get_reference(furl)
225
226
226 def _wrap_remote_reference(rr):
227 def _wrap_remote_reference(rr):
227 d = rr.callRemote('get_client_name')
228 d = rr.callRemote('get_client_name')
228 d.addCallback(lambda name: import_item(name))
229 d.addCallback(lambda name: import_item(name))
229 def adapt(client_interface):
230 def adapt(client_interface):
230 client = client_interface(rr)
231 client = client_interface(rr)
231 client.tub = self.tub
232 client.tub = self.tub
232 return client
233 return client
233 d.addCallback(adapt)
234 d.addCallback(adapt)
234
235
235 return d
236 return d
236
237
237 d.addCallback(_wrap_remote_reference)
238 d.addCallback(_wrap_remote_reference)
238 return d
239 return d
239
240
240
241
241 class ClientConnector(object):
242 class ClientConnector(object):
242 """A blocking version of a client connector.
243 """A blocking version of a client connector.
243
244
244 This class creates a single :class:`Tub` instance and allows remote
245 This class creates a single :class:`Tub` instance and allows remote
245 references and client to be retrieved by their FURLs. Remote references
246 references and client to be retrieved by their FURLs. Remote references
246 are cached locally and FURL files can be found using profiles and cluster
247 are cached locally and FURL files can be found using profiles and cluster
247 directories.
248 directories.
248 """
249 """
249
250
250 def __init__(self):
251 def __init__(self):
251 self.async_cc = AsyncClientConnector()
252 self.async_cc = AsyncClientConnector()
252
253
253 def get_task_client(self, profile='default', cluster_dir=None,
254 def get_task_client(self, profile='default', cluster_dir=None,
254 furl_or_file=None, ipythondir=None):
255 furl_or_file=None, ipythondir=None):
255 """Get the task client.
256 """Get the task client.
256
257
257 Usually only the ``profile`` option will be needed. If a FURL file
258 Usually only the ``profile`` option will be needed. If a FURL file
258 can't be found by its profile, use ``cluster_dir`` or
259 can't be found by its profile, use ``cluster_dir`` or
259 ``furl_or_file``.
260 ``furl_or_file``.
260
261
261 Parameters
262 Parameters
262 ----------
263 ----------
263 profile : str
264 profile : str
264 The name of a cluster directory profile (default="default"). The
265 The name of a cluster directory profile (default="default"). The
265 cluster directory "cluster_<profile>" will be searched for
266 cluster directory "cluster_<profile>" will be searched for
266 in ``os.getcwd()``, the ipythondir and then in the directories
267 in ``os.getcwd()``, the ipythondir and then in the directories
267 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
268 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
268 cluster_dir : str
269 cluster_dir : str
269 The full path to a cluster directory. This is useful if profiles
270 The full path to a cluster directory. This is useful if profiles
270 are not being used.
271 are not being used.
271 furl_or_file : str
272 furl_or_file : str
272 A furl or a filename containing a FURLK. This is useful if you
273 A furl or a filename containing a FURLK. This is useful if you
273 simply know the location of the FURL file.
274 simply know the location of the FURL file.
274 ipythondir : str
275 ipythondir : str
275 The location of the ipythondir if different from the default.
276 The location of the ipythondir if different from the default.
276 This is used if the cluster directory is being found by profile.
277 This is used if the cluster directory is being found by profile.
277
278
278 Returns
279 Returns
279 -------
280 -------
280 The task client instance.
281 The task client instance.
281 """
282 """
282 client = blockingCallFromThread(
283 client = blockingCallFromThread(
283 self.async_cc.get_task_client, profile, cluster_dir,
284 self.async_cc.get_task_client, profile, cluster_dir,
284 furl_or_file, ipythondir
285 furl_or_file, ipythondir
285 )
286 )
286 return client.adapt_to_blocking_client()
287 return client.adapt_to_blocking_client()
287
288
288 def get_multiengine_client(self, profile='default', cluster_dir=None,
289 def get_multiengine_client(self, profile='default', cluster_dir=None,
289 furl_or_file=None, ipythondir=None):
290 furl_or_file=None, ipythondir=None):
290 """Get the multiengine client.
291 """Get the multiengine client.
291
292
292 Usually only the ``profile`` option will be needed. If a FURL file
293 Usually only the ``profile`` option will be needed. If a FURL file
293 can't be found by its profile, use ``cluster_dir`` or
294 can't be found by its profile, use ``cluster_dir`` or
294 ``furl_or_file``.
295 ``furl_or_file``.
295
296
296 Parameters
297 Parameters
297 ----------
298 ----------
298 profile : str
299 profile : str
299 The name of a cluster directory profile (default="default"). The
300 The name of a cluster directory profile (default="default"). The
300 cluster directory "cluster_<profile>" will be searched for
301 cluster directory "cluster_<profile>" will be searched for
301 in ``os.getcwd()``, the ipythondir and then in the directories
302 in ``os.getcwd()``, the ipythondir and then in the directories
302 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
303 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
303 cluster_dir : str
304 cluster_dir : str
304 The full path to a cluster directory. This is useful if profiles
305 The full path to a cluster directory. This is useful if profiles
305 are not being used.
306 are not being used.
306 furl_or_file : str
307 furl_or_file : str
307 A furl or a filename containing a FURLK. This is useful if you
308 A furl or a filename containing a FURLK. This is useful if you
308 simply know the location of the FURL file.
309 simply know the location of the FURL file.
309 ipythondir : str
310 ipythondir : str
310 The location of the ipythondir if different from the default.
311 The location of the ipythondir if different from the default.
311 This is used if the cluster directory is being found by profile.
312 This is used if the cluster directory is being found by profile.
312
313
313 Returns
314 Returns
314 -------
315 -------
315 The multiengine client instance.
316 The multiengine client instance.
316 """
317 """
317 client = blockingCallFromThread(
318 client = blockingCallFromThread(
318 self.async_cc.get_multiengine_client, profile, cluster_dir,
319 self.async_cc.get_multiengine_client, profile, cluster_dir,
319 furl_or_file, ipythondir
320 furl_or_file, ipythondir
320 )
321 )
321 return client.adapt_to_blocking_client()
322 return client.adapt_to_blocking_client()
322
323
323 def get_client(self, profile='default', cluster_dir=None,
324 def get_client(self, profile='default', cluster_dir=None,
324 furl_or_file=None, ipythondir=None):
325 furl_or_file=None, ipythondir=None):
325 client = blockingCallFromThread(
326 client = blockingCallFromThread(
326 self.async_cc.get_client, profile, cluster_dir,
327 self.async_cc.get_client, profile, cluster_dir,
327 furl_or_file, ipythondir
328 furl_or_file, ipythondir
328 )
329 )
329 return client.adapt_to_blocking_client()
330 return client.adapt_to_blocking_client()
330
331
331
332
332 class ClusterStateError(Exception):
333 class ClusterStateError(Exception):
333 pass
334 pass
334
335
335
336
336 class AsyncCluster(object):
337 class AsyncCluster(object):
337 """An class that wraps the :command:`ipcluster` script."""
338 """An class that wraps the :command:`ipcluster` script."""
338
339
339 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
340 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
340 auto_create=False, auto_stop=True):
341 auto_create=False, auto_stop=True):
341 """Create a class to manage an IPython cluster.
342 """Create a class to manage an IPython cluster.
342
343
343 This class calls the :command:`ipcluster` command with the right
344 This class calls the :command:`ipcluster` command with the right
344 options to start an IPython cluster. Typically a cluster directory
345 options to start an IPython cluster. Typically a cluster directory
345 must be created (:command:`ipcluster create`) and configured before
346 must be created (:command:`ipcluster create`) and configured before
346 using this class. Configuration is done by editing the
347 using this class. Configuration is done by editing the
347 configuration files in the top level of the cluster directory.
348 configuration files in the top level of the cluster directory.
348
349
349 Parameters
350 Parameters
350 ----------
351 ----------
351 profile : str
352 profile : str
352 The name of a cluster directory profile (default="default"). The
353 The name of a cluster directory profile (default="default"). The
353 cluster directory "cluster_<profile>" will be searched for
354 cluster directory "cluster_<profile>" will be searched for
354 in ``os.getcwd()``, the ipythondir and then in the directories
355 in ``os.getcwd()``, the ipythondir and then in the directories
355 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
356 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
356 cluster_dir : str
357 cluster_dir : str
357 The full path to a cluster directory. This is useful if profiles
358 The full path to a cluster directory. This is useful if profiles
358 are not being used.
359 are not being used.
359 furl_or_file : str
360 furl_or_file : str
360 A furl or a filename containing a FURLK. This is useful if you
361 A furl or a filename containing a FURLK. This is useful if you
361 simply know the location of the FURL file.
362 simply know the location of the FURL file.
362 ipythondir : str
363 ipythondir : str
363 The location of the ipythondir if different from the default.
364 The location of the ipythondir if different from the default.
364 This is used if the cluster directory is being found by profile.
365 This is used if the cluster directory is being found by profile.
365 auto_create : bool
366 auto_create : bool
366 Automatically create the cluster directory it is dones't exist.
367 Automatically create the cluster directory it is dones't exist.
367 This will usually only make sense if using a local cluster
368 This will usually only make sense if using a local cluster
368 (default=False).
369 (default=False).
369 auto_stop : bool
370 auto_stop : bool
370 Automatically stop the cluster when this instance is garbage
371 Automatically stop the cluster when this instance is garbage
371 collected (default=True). This is useful if you want the cluster
372 collected (default=True). This is useful if you want the cluster
372 to live beyond your current process. There is also an instance
373 to live beyond your current process. There is also an instance
373 attribute ``auto_stop`` to change this behavior.
374 attribute ``auto_stop`` to change this behavior.
374 """
375 """
375 self._setup_cluster_dir(profile, cluster_dir, ipythondir, auto_create)
376 self._setup_cluster_dir(profile, cluster_dir, ipythondir, auto_create)
376 self.state = 'before'
377 self.state = 'before'
377 self.launcher = None
378 self.launcher = None
378 self.client_connector = None
379 self.client_connector = None
379 self.auto_stop = auto_stop
380 self.auto_stop = auto_stop
380
381
381 def __del__(self):
382 def __del__(self):
382 if self.auto_stop and self.state=='running':
383 if self.auto_stop and self.state=='running':
383 print "Auto stopping the cluster..."
384 print "Auto stopping the cluster..."
384 self.stop()
385 self.stop()
385
386
386 @property
387 @property
387 def location(self):
388 def location(self):
388 if hasattr(self, 'cluster_dir_obj'):
389 if hasattr(self, 'cluster_dir_obj'):
389 return self.cluster_dir_obj.location
390 return self.cluster_dir_obj.location
390 else:
391 else:
391 return ''
392 return ''
392
393
393 @property
394 @property
394 def running(self):
395 def running(self):
395 if self.state=='running':
396 if self.state=='running':
396 return True
397 return True
397 else:
398 else:
398 return False
399 return False
399
400
400 def _setup_cluster_dir(self, profile, cluster_dir, ipythondir, auto_create):
401 def _setup_cluster_dir(self, profile, cluster_dir, ipythondir, auto_create):
401 if ipythondir is None:
402 if ipythondir is None:
402 ipythondir = get_ipython_dir()
403 ipythondir = get_ipython_dir()
403 if cluster_dir is not None:
404 if cluster_dir is not None:
404 try:
405 try:
405 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
406 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
406 except ClusterDirError:
407 except ClusterDirError:
407 pass
408 pass
408 if profile is not None:
409 if profile is not None:
409 try:
410 try:
410 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
411 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
411 ipythondir, profile)
412 ipythondir, profile)
412 except ClusterDirError:
413 except ClusterDirError:
413 pass
414 pass
414 if auto_create or profile=='default':
415 if auto_create or profile=='default':
415 # This should call 'ipcluster create --profile default
416 # This should call 'ipcluster create --profile default
416 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
417 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
417 ipythondir, profile)
418 ipythondir, profile)
418 else:
419 else:
419 raise ClusterDirError('Cluster dir not found.')
420 raise ClusterDirError('Cluster dir not found.')
420
421
421 @make_deferred
422 @make_deferred
422 def start(self, n=2):
423 def start(self, n=2):
423 """Start the IPython cluster with n engines.
424 """Start the IPython cluster with n engines.
424
425
425 Parameters
426 Parameters
426 ----------
427 ----------
427 n : int
428 n : int
428 The number of engine to start.
429 The number of engine to start.
429 """
430 """
430 # We might want to add logic to test if the cluster has started
431 # We might want to add logic to test if the cluster has started
431 # by another process....
432 # by another process....
432 if not self.state=='running':
433 if not self.state=='running':
433 self.launcher = IPClusterLauncher(os.getcwd())
434 self.launcher = IPClusterLauncher(os.getcwd())
434 self.launcher.ipcluster_n = n
435 self.launcher.ipcluster_n = n
435 self.launcher.ipcluster_subcommand = 'start'
436 self.launcher.ipcluster_subcommand = 'start'
436 d = self.launcher.start()
437 d = self.launcher.start()
437 d.addCallback(self._handle_start)
438 d.addCallback(self._handle_start)
438 return d
439 return d
439 else:
440 else:
440 raise ClusterStateError('Cluster is already running')
441 raise ClusterStateError('Cluster is already running')
441
442
442 @make_deferred
443 @make_deferred
443 def stop(self):
444 def stop(self):
444 """Stop the IPython cluster if it is running."""
445 """Stop the IPython cluster if it is running."""
445 if self.state=='running':
446 if self.state=='running':
446 d1 = self.launcher.observe_stop()
447 d1 = self.launcher.observe_stop()
447 d1.addCallback(self._handle_stop)
448 d1.addCallback(self._handle_stop)
448 d2 = self.launcher.stop()
449 d2 = self.launcher.stop()
449 return gatherBoth([d1, d2], consumeErrors=True)
450 return gatherBoth([d1, d2], consumeErrors=True)
450 else:
451 else:
451 raise ClusterStateError("Cluster not running")
452 raise ClusterStateError("Cluster not running")
452
453
453 def get_multiengine_client(self):
454 def get_multiengine_client(self):
454 """Get the multiengine client for the running cluster.
455 """Get the multiengine client for the running cluster.
455
456
456 If this fails, it means that the cluster has not finished starting.
457 If this fails, it means that the cluster has not finished starting.
457 Usually waiting a few seconds are re-trying will solve this.
458 Usually waiting a few seconds are re-trying will solve this.
458 """
459 """
459 if self.client_connector is None:
460 if self.client_connector is None:
460 self.client_connector = AsyncClientConnector()
461 self.client_connector = AsyncClientConnector()
461 return self.client_connector.get_multiengine_client(
462 return self.client_connector.get_multiengine_client(
462 cluster_dir=self.cluster_dir_obj.location
463 cluster_dir=self.cluster_dir_obj.location
463 )
464 )
464
465
465 def get_task_client(self):
466 def get_task_client(self):
466 """Get the task client for the running cluster.
467 """Get the task client for the running cluster.
467
468
468 If this fails, it means that the cluster has not finished starting.
469 If this fails, it means that the cluster has not finished starting.
469 Usually waiting a few seconds are re-trying will solve this.
470 Usually waiting a few seconds are re-trying will solve this.
470 """
471 """
471 if self.client_connector is None:
472 if self.client_connector is None:
472 self.client_connector = AsyncClientConnector()
473 self.client_connector = AsyncClientConnector()
473 return self.client_connector.get_task_client(
474 return self.client_connector.get_task_client(
474 cluster_dir=self.cluster_dir_obj.location
475 cluster_dir=self.cluster_dir_obj.location
475 )
476 )
476
477
478 def get_ipengine_logs(self):
479 return self.get_logs_by_name('ipengine')
480
481 def get_ipcontroller_logs(self):
482 return self.get_logs_by_name('ipcontroller')
483
484 def get_ipcluster_logs(self):
485 return self.get_logs_by_name('ipcluster')
486
487 def get_logs_by_name(self, name='ipcluster'):
488 log_dir = self.cluster_dir_obj.log_dir
489 logs = {}
490 for log in os.listdir(log_dir):
491 if log.startswith(name + '-') and log.endswith('.log'):
492 with open(os.path.join(log_dir, log), 'r') as f:
493 logs[log] = f.read()
494 return logs
495
496 def get_logs(self):
497 d = self.get_ipcluster_logs()
498 d.update(self.get_ipengine_logs())
499 d.update(self.get_ipcontroller_logs())
500 return d
501
477 def _handle_start(self, r):
502 def _handle_start(self, r):
478 self.state = 'running'
503 self.state = 'running'
479
504
480 def _handle_stop(self, r):
505 def _handle_stop(self, r):
481 self.state = 'after'
506 self.state = 'after'
482
507
483
508
484 class Cluster(object):
509 class Cluster(object):
485
510
486
511
487 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
512 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
488 auto_create=False, auto_stop=True):
513 auto_create=False, auto_stop=True):
489 """Create a class to manage an IPython cluster.
514 """Create a class to manage an IPython cluster.
490
515
491 This class calls the :command:`ipcluster` command with the right
516 This class calls the :command:`ipcluster` command with the right
492 options to start an IPython cluster. Typically a cluster directory
517 options to start an IPython cluster. Typically a cluster directory
493 must be created (:command:`ipcluster create`) and configured before
518 must be created (:command:`ipcluster create`) and configured before
494 using this class. Configuration is done by editing the
519 using this class. Configuration is done by editing the
495 configuration files in the top level of the cluster directory.
520 configuration files in the top level of the cluster directory.
496
521
497 Parameters
522 Parameters
498 ----------
523 ----------
499 profile : str
524 profile : str
500 The name of a cluster directory profile (default="default"). The
525 The name of a cluster directory profile (default="default"). The
501 cluster directory "cluster_<profile>" will be searched for
526 cluster directory "cluster_<profile>" will be searched for
502 in ``os.getcwd()``, the ipythondir and then in the directories
527 in ``os.getcwd()``, the ipythondir and then in the directories
503 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
528 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
504 cluster_dir : str
529 cluster_dir : str
505 The full path to a cluster directory. This is useful if profiles
530 The full path to a cluster directory. This is useful if profiles
506 are not being used.
531 are not being used.
507 furl_or_file : str
532 furl_or_file : str
508 A furl or a filename containing a FURLK. This is useful if you
533 A furl or a filename containing a FURLK. This is useful if you
509 simply know the location of the FURL file.
534 simply know the location of the FURL file.
510 ipythondir : str
535 ipythondir : str
511 The location of the ipythondir if different from the default.
536 The location of the ipythondir if different from the default.
512 This is used if the cluster directory is being found by profile.
537 This is used if the cluster directory is being found by profile.
513 auto_create : bool
538 auto_create : bool
514 Automatically create the cluster directory it is dones't exist.
539 Automatically create the cluster directory it is dones't exist.
515 This will usually only make sense if using a local cluster
540 This will usually only make sense if using a local cluster
516 (default=False).
541 (default=False).
517 auto_stop : bool
542 auto_stop : bool
518 Automatically stop the cluster when this instance is garbage
543 Automatically stop the cluster when this instance is garbage
519 collected (default=True). This is useful if you want the cluster
544 collected (default=True). This is useful if you want the cluster
520 to live beyond your current process. There is also an instance
545 to live beyond your current process. There is also an instance
521 attribute ``auto_stop`` to change this behavior.
546 attribute ``auto_stop`` to change this behavior.
522 """
547 """
523 self.async_cluster = AsyncCluster(
548 self.async_cluster = AsyncCluster(
524 profile, cluster_dir, ipythondir, auto_create, auto_stop
549 profile, cluster_dir, ipythondir, auto_create, auto_stop
525 )
550 )
526 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
551 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
527 self.client_connector = None
552 self.client_connector = None
528
553
529 def _set_auto_stop(self, value):
554 def _set_auto_stop(self, value):
530 self.async_cluster.auto_stop = value
555 self.async_cluster.auto_stop = value
531
556
532 def _get_auto_stop(self):
557 def _get_auto_stop(self):
533 return self.async_cluster.auto_stop
558 return self.async_cluster.auto_stop
534
559
535 auto_stop = property(_get_auto_stop, _set_auto_stop)
560 auto_stop = property(_get_auto_stop, _set_auto_stop)
536
561
537 @property
562 @property
538 def location(self):
563 def location(self):
539 return self.async_cluster.location
564 return self.async_cluster.location
540
565
541 @property
566 @property
542 def running(self):
567 def running(self):
543 return self.async_cluster.running
568 return self.async_cluster.running
544
569
545 def start(self, n=2):
570 def start(self, n=2):
546 """Start the IPython cluster with n engines.
571 """Start the IPython cluster with n engines.
547
572
548 Parameters
573 Parameters
549 ----------
574 ----------
550 n : int
575 n : int
551 The number of engine to start.
576 The number of engine to start.
552 """
577 """
553 return blockingCallFromThread(self.async_cluster.start, n)
578 return blockingCallFromThread(self.async_cluster.start, n)
554
579
555 def stop(self):
580 def stop(self):
556 """Stop the IPython cluster if it is running."""
581 """Stop the IPython cluster if it is running."""
557 return blockingCallFromThread(self.async_cluster.stop)
582 return blockingCallFromThread(self.async_cluster.stop)
558
583
559 def get_multiengine_client(self):
584 def get_multiengine_client(self):
560 """Get the multiengine client for the running cluster.
585 """Get the multiengine client for the running cluster.
561
586
562 If this fails, it means that the cluster has not finished starting.
587 If this fails, it means that the cluster has not finished starting.
563 Usually waiting a few seconds are re-trying will solve this.
588 Usually waiting a few seconds are re-trying will solve this.
564 """
589 """
565 if self.client_connector is None:
590 if self.client_connector is None:
566 self.client_connector = ClientConnector()
591 self.client_connector = ClientConnector()
567 return self.client_connector.get_multiengine_client(
592 return self.client_connector.get_multiengine_client(
568 cluster_dir=self.cluster_dir_obj.location
593 cluster_dir=self.cluster_dir_obj.location
569 )
594 )
570
595
571 def get_task_client(self):
596 def get_task_client(self):
572 """Get the task client for the running cluster.
597 """Get the task client for the running cluster.
573
598
574 If this fails, it means that the cluster has not finished starting.
599 If this fails, it means that the cluster has not finished starting.
575 Usually waiting a few seconds are re-trying will solve this.
600 Usually waiting a few seconds are re-trying will solve this.
576 """
601 """
577 if self.client_connector is None:
602 if self.client_connector is None:
578 self.client_connector = ClientConnector()
603 self.client_connector = ClientConnector()
579 return self.client_connector.get_task_client(
604 return self.client_connector.get_task_client(
580 cluster_dir=self.cluster_dir_obj.location
605 cluster_dir=self.cluster_dir_obj.location
581 )
606 )
582
607
583
608 def __repr__(self):
609 s = "<Cluster(running=%r, location=%s)" % (self.running, self.location)
610 return s
611
612 def get_logs_by_name(self, name='ipcluter'):
613 """Get a dict of logs by process name (ipcluster, ipengine, etc.)"""
614 return self.async_cluster.get_logs_by_name(name)
615
616 def get_ipengine_logs(self):
617 """Get a dict of logs for all engines in this cluster."""
618 return self.async_cluster.get_ipengine_logs()
619
620 def get_ipcontroller_logs(self):
621 """Get a dict of logs for the controller in this cluster."""
622 return self.async_cluster.get_ipcontroller_logs()
623
624 def get_ipcluster_logs(self):
625 """Get a dict of the ipcluster logs for this cluster."""
626 return self.async_cluster.get_ipcluster_logs()
627
628 def get_logs(self):
629 """Get a dict of all logs for this cluster."""
630 return self.async_cluster.get_logs()
631
632 def _print_logs(self, logs):
633 for k, v in logs.iteritems():
634 print "==================================="
635 print "Logfile: %s" % k
636 print "==================================="
637 print v
638 print
639
640 def print_ipengine_logs(self):
641 """Print the ipengine logs for this cluster to stdout."""
642 self._print_logs(self.get_ipengine_logs())
643
644 def print_ipcontroller_logs(self):
645 """Print the ipcontroller logs for this cluster to stdout."""
646 self._print_logs(self.get_ipcontroller_logs())
647
648 def print_ipcluster_logs(self):
649 """Print the ipcluster logs for this cluster to stdout."""
650 self._print_logs(self.get_ipcluster_logs())
651
652 def print_logs(self):
653 """Print all the logs for this cluster to stdout."""
654 self._print_logs(self.get_logs())
584
655
General Comments 0
You need to be logged in to leave comments. Login now