##// END OF EJS Templates
Work on engine/client reconnect logic.
Brian Granger -
Show More
@@ -1,655 +1,713 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Facilities for handling client connections to the controller."""
4 """Facilities for handling client connections to the controller."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 from __future__ import with_statement
17 from __future__ import with_statement
18 import os
18 import os
19
19
20 from IPython.kernel.fcutil import Tub, find_furl
20 from IPython.kernel.fcutil import (
21 Tub,
22 find_furl,
23 is_valid_furl_or_file,
24 validate_furl_or_file,
25 FURLError
26 )
21 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
27 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
22 from IPython.kernel.launcher import IPClusterLauncher
28 from IPython.kernel.launcher import IPClusterLauncher
23 from IPython.kernel.twistedutil import gatherBoth, make_deferred
29 from IPython.kernel.twistedutil import (
24 from IPython.kernel.twistedutil import blockingCallFromThread
30 gatherBoth,
25
31 make_deferred,
32 blockingCallFromThread,
33 sleep_deferred
34 )
26 from IPython.utils.importstring import import_item
35 from IPython.utils.importstring import import_item
27 from IPython.utils.genutils import get_ipython_dir
36 from IPython.utils.genutils import get_ipython_dir
28
37
29 from twisted.internet import defer
38 from twisted.internet import defer
30 from twisted.python import failure
39 from twisted.internet.defer import inlineCallbacks, returnValue
40 from twisted.python import failure, log
31
41
32 #-----------------------------------------------------------------------------
42 #-----------------------------------------------------------------------------
33 # The ClientConnector class
43 # The ClientConnector class
34 #-----------------------------------------------------------------------------
44 #-----------------------------------------------------------------------------
35
45
46 DELAY = 0.2
47 MAX_TRIES = 9
48
49
50 class ClientConnectorError(Exception):
51 pass
52
36
53
37 class AsyncClientConnector(object):
54 class AsyncClientConnector(object):
38 """A class for getting remote references and clients from furls.
55 """A class for getting remote references and clients from furls.
39
56
40 This start a single :class:`Tub` for all remote reference and caches
57 This start a single :class:`Tub` for all remote reference and caches
41 references.
58 references.
42 """
59 """
43
60
44 def __init__(self):
61 def __init__(self):
45 self._remote_refs = {}
62 self._remote_refs = {}
46 self.tub = Tub()
63 self.tub = Tub()
47 self.tub.startService()
64 self.tub.startService()
48
65
49 def _find_furl(self, profile='default', cluster_dir=None,
66 def _find_furl(self, profile='default', cluster_dir=None,
50 furl_or_file=None, furl_file_name=None,
67 furl_or_file=None, furl_file_name=None,
51 ipythondir=None):
68 ipythondir=None):
52 """Find a FURL file by profile+ipythondir or cluster dir.
69 """Find a FURL file by profile+ipythondir or cluster dir.
53
70
54 This raises an exception if a FURL file can't be found.
71 This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception
72 if a FURL file can't be found.
55 """
73 """
56 # Try by furl_or_file
74 # Try by furl_or_file
57 if furl_or_file is not None:
75 if furl_or_file is not None:
58 try:
76 validate_furl_or_file(furl_or_file)
59 furl = find_furl(furl_or_file)
77 return furl_or_file
60 except ValueError:
61 return furl
62
78
63 if furl_file_name is None:
79 if furl_file_name is None:
64 raise ValueError('A furl_file_name must be provided')
80 raise FURLError('A furl_file_name must be provided')
65
81
66 # Try by cluster_dir
82 # Try by cluster_dir
67 if cluster_dir is not None:
83 if cluster_dir is not None:
68 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
84 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
69 sdir = cluster_dir_obj.security_dir
85 sdir = cluster_dir_obj.security_dir
70 furl_file = os.path.join(sdir, furl_file_name)
86 furl_file = os.path.join(sdir, furl_file_name)
71 return find_furl(furl_file)
87 validate_furl_or_file(furl_file)
88 return furl_file
72
89
73 # Try by profile
90 # Try by profile
74 if ipythondir is None:
91 if ipythondir is None:
75 ipythondir = get_ipython_dir()
92 ipythondir = get_ipython_dir()
76 if profile is not None:
93 if profile is not None:
77 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
94 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
78 ipythondir, profile)
95 ipythondir, profile)
79 sdir = cluster_dir_obj.security_dir
96 sdir = cluster_dir_obj.security_dir
80 furl_file = os.path.join(sdir, furl_file_name)
97 furl_file = os.path.join(sdir, furl_file_name)
81 return find_furl(furl_file)
98 validate_furl_or_file(furl_file)
99 return furl_file
82
100
83 raise ValueError('Could not find a valid FURL file.')
101 raise FURLError('Could not find a valid FURL file.')
84
102
85 def get_reference(self, furl_or_file):
103 def get_reference(self, furl_or_file):
86 """Get a remote reference using a furl or a file containing a furl.
104 """Get a remote reference using a furl or a file containing a furl.
87
105
88 Remote references are cached locally so once a remote reference
106 Remote references are cached locally so once a remote reference
89 has been retrieved for a given furl, the cached version is
107 has been retrieved for a given furl, the cached version is
90 returned.
108 returned.
91
109
92 Parameters
110 Parameters
93 ----------
111 ----------
94 furl_or_file : str
112 furl_or_file : str
95 A furl or a filename containing a furl
113 A furl or a filename containing a furl. This should already be
114 validated, but might not yet exist.
96
115
97 Returns
116 Returns
98 -------
117 -------
99 A deferred to a remote reference
118 A deferred to a remote reference
100 """
119 """
101 furl = find_furl(furl_or_file)
120 furl = furl_or_file
102 if furl in self._remote_refs:
121 if furl in self._remote_refs:
103 d = defer.succeed(self._remote_refs[furl])
122 d = defer.succeed(self._remote_refs[furl])
104 else:
123 else:
105 d = self.tub.getReference(furl)
124 d = self.tub.getReference(furl)
106 d.addCallback(self._save_ref, furl)
125 d.addCallback(self._save_ref, furl)
107 return d
126 return d
108
127
109 def _save_ref(self, ref, furl):
128 def _save_ref(self, ref, furl):
110 """Cache a remote reference by its furl."""
129 """Cache a remote reference by its furl."""
111 self._remote_refs[furl] = ref
130 self._remote_refs[furl] = ref
112 return ref
131 return ref
113
132
114 def get_task_client(self, profile='default', cluster_dir=None,
133 def get_task_client(self, profile='default', cluster_dir=None,
115 furl_or_file=None, ipythondir=None):
134 furl_or_file=None, ipythondir=None,
135 delay=DELAY, max_tries=MAX_TRIES):
116 """Get the task controller client.
136 """Get the task controller client.
117
137
118 This method is a simple wrapper around `get_client` that passes in
138 This method is a simple wrapper around `get_client` that passes in
119 the default name of the task client FURL file. Usually only
139 the default name of the task client FURL file. Usually only
120 the ``profile`` option will be needed. If a FURL file can't be
140 the ``profile`` option will be needed. If a FURL file can't be
121 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
141 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
122
142
123 Parameters
143 Parameters
124 ----------
144 ----------
125 profile : str
145 profile : str
126 The name of a cluster directory profile (default="default"). The
146 The name of a cluster directory profile (default="default"). The
127 cluster directory "cluster_<profile>" will be searched for
147 cluster directory "cluster_<profile>" will be searched for
128 in ``os.getcwd()``, the ipythondir and then in the directories
148 in ``os.getcwd()``, the ipythondir and then in the directories
129 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
149 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
130 cluster_dir : str
150 cluster_dir : str
131 The full path to a cluster directory. This is useful if profiles
151 The full path to a cluster directory. This is useful if profiles
132 are not being used.
152 are not being used.
133 furl_or_file : str
153 furl_or_file : str
134 A furl or a filename containing a FURLK. This is useful if you
154 A furl or a filename containing a FURLK. This is useful if you
135 simply know the location of the FURL file.
155 simply know the location of the FURL file.
136 ipythondir : str
156 ipythondir : str
137 The location of the ipythondir if different from the default.
157 The location of the ipythondir if different from the default.
138 This is used if the cluster directory is being found by profile.
158 This is used if the cluster directory is being found by profile.
139
159
140 Returns
160 Returns
141 -------
161 -------
142 A deferred to the actual client class.
162 A deferred to the actual client class.
143 """
163 """
144 return self.get_client(
164 return self.get_client(
145 profile, cluster_dir, furl_or_file,
165 profile, cluster_dir, furl_or_file,
146 'ipcontroller-tc.furl', ipythondir
166 'ipcontroller-tc.furl', ipythondir,
167 delay, max_tries
147 )
168 )
148
169
149 def get_multiengine_client(self, profile='default', cluster_dir=None,
170 def get_multiengine_client(self, profile='default', cluster_dir=None,
150 furl_or_file=None, ipythondir=None):
171 furl_or_file=None, ipythondir=None,
172 delay=DELAY, max_tries=MAX_TRIES):
151 """Get the multiengine controller client.
173 """Get the multiengine controller client.
152
174
153 This method is a simple wrapper around `get_client` that passes in
175 This method is a simple wrapper around `get_client` that passes in
154 the default name of the task client FURL file. Usually only
176 the default name of the task client FURL file. Usually only
155 the ``profile`` option will be needed. If a FURL file can't be
177 the ``profile`` option will be needed. If a FURL file can't be
156 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
178 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
157
179
158 Parameters
180 Parameters
159 ----------
181 ----------
160 profile : str
182 profile : str
161 The name of a cluster directory profile (default="default"). The
183 The name of a cluster directory profile (default="default"). The
162 cluster directory "cluster_<profile>" will be searched for
184 cluster directory "cluster_<profile>" will be searched for
163 in ``os.getcwd()``, the ipythondir and then in the directories
185 in ``os.getcwd()``, the ipythondir and then in the directories
164 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
186 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
165 cluster_dir : str
187 cluster_dir : str
166 The full path to a cluster directory. This is useful if profiles
188 The full path to a cluster directory. This is useful if profiles
167 are not being used.
189 are not being used.
168 furl_or_file : str
190 furl_or_file : str
169 A furl or a filename containing a FURLK. This is useful if you
191 A furl or a filename containing a FURLK. This is useful if you
170 simply know the location of the FURL file.
192 simply know the location of the FURL file.
171 ipythondir : str
193 ipythondir : str
172 The location of the ipythondir if different from the default.
194 The location of the ipythondir if different from the default.
173 This is used if the cluster directory is being found by profile.
195 This is used if the cluster directory is being found by profile.
174
196
175 Returns
197 Returns
176 -------
198 -------
177 A deferred to the actual client class.
199 A deferred to the actual client class.
178 """
200 """
179 return self.get_client(
201 return self.get_client(
180 profile, cluster_dir, furl_or_file,
202 profile, cluster_dir, furl_or_file,
181 'ipcontroller-mec.furl', ipythondir
203 'ipcontroller-mec.furl', ipythondir,
204 delay, max_tries
182 )
205 )
183
206
184 def get_client(self, profile='default', cluster_dir=None,
207 def get_client(self, profile='default', cluster_dir=None,
185 furl_or_file=None, furl_file_name=None, ipythondir=None):
208 furl_or_file=None, furl_file_name=None, ipythondir=None,
209 delay=DELAY, max_tries=MAX_TRIES):
186 """Get a remote reference and wrap it in a client by furl.
210 """Get a remote reference and wrap it in a client by furl.
187
211
188 This method is a simple wrapper around `get_client` that passes in
212 This method is a simple wrapper around `get_client` that passes in
189 the default name of the task client FURL file. Usually only
213 the default name of the task client FURL file. Usually only
190 the ``profile`` option will be needed. If a FURL file can't be
214 the ``profile`` option will be needed. If a FURL file can't be
191 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
215 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
192
216
193 Parameters
217 Parameters
194 ----------
218 ----------
195 profile : str
219 profile : str
196 The name of a cluster directory profile (default="default"). The
220 The name of a cluster directory profile (default="default"). The
197 cluster directory "cluster_<profile>" will be searched for
221 cluster directory "cluster_<profile>" will be searched for
198 in ``os.getcwd()``, the ipythondir and then in the directories
222 in ``os.getcwd()``, the ipythondir and then in the directories
199 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
223 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
200 cluster_dir : str
224 cluster_dir : str
201 The full path to a cluster directory. This is useful if profiles
225 The full path to a cluster directory. This is useful if profiles
202 are not being used.
226 are not being used.
203 furl_or_file : str
227 furl_or_file : str
204 A furl or a filename containing a FURLK. This is useful if you
228 A furl or a filename containing a FURL. This is useful if you
205 simply know the location of the FURL file.
229 simply know the location of the FURL file.
206 furl_file_name : str
230 furl_file_name : str
207 The filename (not the full path) of the FURL. This must be
231 The filename (not the full path) of the FURL. This must be
208 provided if ``furl_or_file`` is not.
232 provided if ``furl_or_file`` is not.
209 ipythondir : str
233 ipythondir : str
210 The location of the ipythondir if different from the default.
234 The location of the ipythondir if different from the default.
211 This is used if the cluster directory is being found by profile.
235 This is used if the cluster directory is being found by profile.
212
236
213 Returns
237 Returns
214 -------
238 -------
215 A deferred to the actual client class.
239 A deferred to the actual client class. Or a failure to a
240 :exc:`FURLError`.
216 """
241 """
217 try:
242 try:
218 furl = self._find_furl(
243 furl_file = self._find_furl(
219 profile, cluster_dir, furl_or_file,
244 profile, cluster_dir, furl_or_file,
220 furl_file_name, ipythondir
245 furl_file_name, ipythondir
221 )
246 )
222 except:
247 except FURLError:
223 return defer.fail(failure.Failure())
248 return defer.fail(failure.Failure())
224
249
225 d = self.get_reference(furl)
226
227 def _wrap_remote_reference(rr):
250 def _wrap_remote_reference(rr):
228 d = rr.callRemote('get_client_name')
251 d = rr.callRemote('get_client_name')
229 d.addCallback(lambda name: import_item(name))
252 d.addCallback(lambda name: import_item(name))
230 def adapt(client_interface):
253 def adapt(client_interface):
231 client = client_interface(rr)
254 client = client_interface(rr)
232 client.tub = self.tub
255 client.tub = self.tub
233 return client
256 return client
234 d.addCallback(adapt)
257 d.addCallback(adapt)
235
258
236 return d
259 return d
237
260
261 d = self._try_to_connect(furl_file, delay, max_tries, attempt=0)
238 d.addCallback(_wrap_remote_reference)
262 d.addCallback(_wrap_remote_reference)
239 return d
263 return d
240
264
265 @inlineCallbacks
266 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
267 """Try to connect to the controller with retry logic."""
268 if attempt < max_tries:
269 log.msg("Connecting to controller [%r]: %s" % \
270 (attempt, furl_or_file))
271 try:
272 self.furl = find_furl(furl_or_file)
273 # Uncomment this to see the FURL being tried.
274 # log.msg("FURL: %s" % self.furl)
275 rr = yield self.get_reference(self.furl)
276 except:
277 if attempt==max_tries-1:
278 # This will propagate the exception all the way to the top
279 # where it can be handled.
280 raise
281 else:
282 yield sleep_deferred(delay)
283 rr = yield self._try_to_connect(
284 furl_or_file, 1.5*delay, max_tries, attempt+1
285 )
286 returnValue(rr)
287 else:
288 returnValue(rr)
289 else:
290 raise ClientConnectorError(
291 'Could not connect to controller, max_tries (%r) exceeded. '
292 'This usually means that i) the controller was not started, '
293 'or ii) a firewall was blocking the client from connecting '
294 'to the controller.' % max_tries
295 )
296
241
297
242 class ClientConnector(object):
298 class ClientConnector(object):
243 """A blocking version of a client connector.
299 """A blocking version of a client connector.
244
300
245 This class creates a single :class:`Tub` instance and allows remote
301 This class creates a single :class:`Tub` instance and allows remote
246 references and client to be retrieved by their FURLs. Remote references
302 references and client to be retrieved by their FURLs. Remote references
247 are cached locally and FURL files can be found using profiles and cluster
303 are cached locally and FURL files can be found using profiles and cluster
248 directories.
304 directories.
249 """
305 """
250
306
251 def __init__(self):
307 def __init__(self):
252 self.async_cc = AsyncClientConnector()
308 self.async_cc = AsyncClientConnector()
253
309
254 def get_task_client(self, profile='default', cluster_dir=None,
310 def get_task_client(self, profile='default', cluster_dir=None,
255 furl_or_file=None, ipythondir=None):
311 furl_or_file=None, ipythondir=None,
312 delay=DELAY, max_tries=MAX_TRIES):
256 """Get the task client.
313 """Get the task client.
257
314
258 Usually only the ``profile`` option will be needed. If a FURL file
315 Usually only the ``profile`` option will be needed. If a FURL file
259 can't be found by its profile, use ``cluster_dir`` or
316 can't be found by its profile, use ``cluster_dir`` or
260 ``furl_or_file``.
317 ``furl_or_file``.
261
318
262 Parameters
319 Parameters
263 ----------
320 ----------
264 profile : str
321 profile : str
265 The name of a cluster directory profile (default="default"). The
322 The name of a cluster directory profile (default="default"). The
266 cluster directory "cluster_<profile>" will be searched for
323 cluster directory "cluster_<profile>" will be searched for
267 in ``os.getcwd()``, the ipythondir and then in the directories
324 in ``os.getcwd()``, the ipythondir and then in the directories
268 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
325 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
269 cluster_dir : str
326 cluster_dir : str
270 The full path to a cluster directory. This is useful if profiles
327 The full path to a cluster directory. This is useful if profiles
271 are not being used.
328 are not being used.
272 furl_or_file : str
329 furl_or_file : str
273 A furl or a filename containing a FURLK. This is useful if you
330 A furl or a filename containing a FURLK. This is useful if you
274 simply know the location of the FURL file.
331 simply know the location of the FURL file.
275 ipythondir : str
332 ipythondir : str
276 The location of the ipythondir if different from the default.
333 The location of the ipythondir if different from the default.
277 This is used if the cluster directory is being found by profile.
334 This is used if the cluster directory is being found by profile.
278
335
279 Returns
336 Returns
280 -------
337 -------
281 The task client instance.
338 The task client instance.
282 """
339 """
283 client = blockingCallFromThread(
340 client = blockingCallFromThread(
284 self.async_cc.get_task_client, profile, cluster_dir,
341 self.async_cc.get_task_client, profile, cluster_dir,
285 furl_or_file, ipythondir
342 furl_or_file, ipythondir, delay, max_tries
286 )
343 )
287 return client.adapt_to_blocking_client()
344 return client.adapt_to_blocking_client()
288
345
289 def get_multiengine_client(self, profile='default', cluster_dir=None,
346 def get_multiengine_client(self, profile='default', cluster_dir=None,
290 furl_or_file=None, ipythondir=None):
347 furl_or_file=None, ipythondir=None,
348 delay=DELAY, max_tries=MAX_TRIES):
291 """Get the multiengine client.
349 """Get the multiengine client.
292
350
293 Usually only the ``profile`` option will be needed. If a FURL file
351 Usually only the ``profile`` option will be needed. If a FURL file
294 can't be found by its profile, use ``cluster_dir`` or
352 can't be found by its profile, use ``cluster_dir`` or
295 ``furl_or_file``.
353 ``furl_or_file``.
296
354
297 Parameters
355 Parameters
298 ----------
356 ----------
299 profile : str
357 profile : str
300 The name of a cluster directory profile (default="default"). The
358 The name of a cluster directory profile (default="default"). The
301 cluster directory "cluster_<profile>" will be searched for
359 cluster directory "cluster_<profile>" will be searched for
302 in ``os.getcwd()``, the ipythondir and then in the directories
360 in ``os.getcwd()``, the ipythondir and then in the directories
303 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
361 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
304 cluster_dir : str
362 cluster_dir : str
305 The full path to a cluster directory. This is useful if profiles
363 The full path to a cluster directory. This is useful if profiles
306 are not being used.
364 are not being used.
307 furl_or_file : str
365 furl_or_file : str
308 A furl or a filename containing a FURLK. This is useful if you
366 A furl or a filename containing a FURLK. This is useful if you
309 simply know the location of the FURL file.
367 simply know the location of the FURL file.
310 ipythondir : str
368 ipythondir : str
311 The location of the ipythondir if different from the default.
369 The location of the ipythondir if different from the default.
312 This is used if the cluster directory is being found by profile.
370 This is used if the cluster directory is being found by profile.
313
371
314 Returns
372 Returns
315 -------
373 -------
316 The multiengine client instance.
374 The multiengine client instance.
317 """
375 """
318 client = blockingCallFromThread(
376 client = blockingCallFromThread(
319 self.async_cc.get_multiengine_client, profile, cluster_dir,
377 self.async_cc.get_multiengine_client, profile, cluster_dir,
320 furl_or_file, ipythondir
378 furl_or_file, ipythondir, delay, max_tries
321 )
379 )
322 return client.adapt_to_blocking_client()
380 return client.adapt_to_blocking_client()
323
381
324 def get_client(self, profile='default', cluster_dir=None,
382 def get_client(self, profile='default', cluster_dir=None,
325 furl_or_file=None, ipythondir=None):
383 furl_or_file=None, ipythondir=None,
384 delay=DELAY, max_tries=MAX_TRIES):
326 client = blockingCallFromThread(
385 client = blockingCallFromThread(
327 self.async_cc.get_client, profile, cluster_dir,
386 self.async_cc.get_client, profile, cluster_dir,
328 furl_or_file, ipythondir
387 furl_or_file, ipythondir,
388 delay, max_tries
329 )
389 )
330 return client.adapt_to_blocking_client()
390 return client.adapt_to_blocking_client()
331
391
332
392
333 class ClusterStateError(Exception):
393 class ClusterStateError(Exception):
334 pass
394 pass
335
395
336
396
337 class AsyncCluster(object):
397 class AsyncCluster(object):
338 """An class that wraps the :command:`ipcluster` script."""
398 """An class that wraps the :command:`ipcluster` script."""
339
399
340 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
400 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
341 auto_create=False, auto_stop=True):
401 auto_create=False, auto_stop=True):
342 """Create a class to manage an IPython cluster.
402 """Create a class to manage an IPython cluster.
343
403
344 This class calls the :command:`ipcluster` command with the right
404 This class calls the :command:`ipcluster` command with the right
345 options to start an IPython cluster. Typically a cluster directory
405 options to start an IPython cluster. Typically a cluster directory
346 must be created (:command:`ipcluster create`) and configured before
406 must be created (:command:`ipcluster create`) and configured before
347 using this class. Configuration is done by editing the
407 using this class. Configuration is done by editing the
348 configuration files in the top level of the cluster directory.
408 configuration files in the top level of the cluster directory.
349
409
350 Parameters
410 Parameters
351 ----------
411 ----------
352 profile : str
412 profile : str
353 The name of a cluster directory profile (default="default"). The
413 The name of a cluster directory profile (default="default"). The
354 cluster directory "cluster_<profile>" will be searched for
414 cluster directory "cluster_<profile>" will be searched for
355 in ``os.getcwd()``, the ipythondir and then in the directories
415 in ``os.getcwd()``, the ipythondir and then in the directories
356 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
416 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
357 cluster_dir : str
417 cluster_dir : str
358 The full path to a cluster directory. This is useful if profiles
418 The full path to a cluster directory. This is useful if profiles
359 are not being used.
419 are not being used.
360 furl_or_file : str
361 A furl or a filename containing a FURLK. This is useful if you
362 simply know the location of the FURL file.
363 ipythondir : str
420 ipythondir : str
364 The location of the ipythondir if different from the default.
421 The location of the ipythondir if different from the default.
365 This is used if the cluster directory is being found by profile.
422 This is used if the cluster directory is being found by profile.
366 auto_create : bool
423 auto_create : bool
367 Automatically create the cluster directory it is dones't exist.
424 Automatically create the cluster directory it is dones't exist.
368 This will usually only make sense if using a local cluster
425 This will usually only make sense if using a local cluster
369 (default=False).
426 (default=False).
370 auto_stop : bool
427 auto_stop : bool
371 Automatically stop the cluster when this instance is garbage
428 Automatically stop the cluster when this instance is garbage
372 collected (default=True). This is useful if you want the cluster
429 collected (default=True). This is useful if you want the cluster
373 to live beyond your current process. There is also an instance
430 to live beyond your current process. There is also an instance
374 attribute ``auto_stop`` to change this behavior.
431 attribute ``auto_stop`` to change this behavior.
375 """
432 """
376 self._setup_cluster_dir(profile, cluster_dir, ipythondir, auto_create)
433 self._setup_cluster_dir(profile, cluster_dir, ipythondir, auto_create)
377 self.state = 'before'
434 self.state = 'before'
378 self.launcher = None
435 self.launcher = None
379 self.client_connector = None
436 self.client_connector = None
380 self.auto_stop = auto_stop
437 self.auto_stop = auto_stop
381
438
382 def __del__(self):
439 def __del__(self):
383 if self.auto_stop and self.state=='running':
440 if self.auto_stop and self.state=='running':
384 print "Auto stopping the cluster..."
441 print "Auto stopping the cluster..."
385 self.stop()
442 self.stop()
386
443
387 @property
444 @property
388 def location(self):
445 def location(self):
389 if hasattr(self, 'cluster_dir_obj'):
446 if hasattr(self, 'cluster_dir_obj'):
390 return self.cluster_dir_obj.location
447 return self.cluster_dir_obj.location
391 else:
448 else:
392 return ''
449 return ''
393
450
394 @property
451 @property
395 def running(self):
452 def running(self):
396 if self.state=='running':
453 if self.state=='running':
397 return True
454 return True
398 else:
455 else:
399 return False
456 return False
400
457
401 def _setup_cluster_dir(self, profile, cluster_dir, ipythondir, auto_create):
458 def _setup_cluster_dir(self, profile, cluster_dir, ipythondir, auto_create):
402 if ipythondir is None:
459 if ipythondir is None:
403 ipythondir = get_ipython_dir()
460 ipythondir = get_ipython_dir()
404 if cluster_dir is not None:
461 if cluster_dir is not None:
405 try:
462 try:
406 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
463 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
407 except ClusterDirError:
464 except ClusterDirError:
408 pass
465 pass
409 if profile is not None:
466 if profile is not None:
410 try:
467 try:
411 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
468 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
412 ipythondir, profile)
469 ipythondir, profile)
413 except ClusterDirError:
470 except ClusterDirError:
414 pass
471 pass
415 if auto_create or profile=='default':
472 if auto_create or profile=='default':
416 # This should call 'ipcluster create --profile default
473 # This should call 'ipcluster create --profile default
417 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
474 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
418 ipythondir, profile)
475 ipythondir, profile)
419 else:
476 else:
420 raise ClusterDirError('Cluster dir not found.')
477 raise ClusterDirError('Cluster dir not found.')
421
478
422 @make_deferred
479 @make_deferred
423 def start(self, n=2):
480 def start(self, n=2):
424 """Start the IPython cluster with n engines.
481 """Start the IPython cluster with n engines.
425
482
426 Parameters
483 Parameters
427 ----------
484 ----------
428 n : int
485 n : int
429 The number of engine to start.
486 The number of engine to start.
430 """
487 """
431 # We might want to add logic to test if the cluster has started
488 # We might want to add logic to test if the cluster has started
432 # by another process....
489 # by another process....
433 if not self.state=='running':
490 if not self.state=='running':
434 self.launcher = IPClusterLauncher(os.getcwd())
491 self.launcher = IPClusterLauncher(os.getcwd())
435 self.launcher.ipcluster_n = n
492 self.launcher.ipcluster_n = n
436 self.launcher.ipcluster_subcommand = 'start'
493 self.launcher.ipcluster_subcommand = 'start'
437 d = self.launcher.start()
494 d = self.launcher.start()
438 d.addCallback(self._handle_start)
495 d.addCallback(self._handle_start)
439 return d
496 return d
440 else:
497 else:
441 raise ClusterStateError('Cluster is already running')
498 raise ClusterStateError('Cluster is already running')
442
499
443 @make_deferred
500 @make_deferred
444 def stop(self):
501 def stop(self):
445 """Stop the IPython cluster if it is running."""
502 """Stop the IPython cluster if it is running."""
446 if self.state=='running':
503 if self.state=='running':
447 d1 = self.launcher.observe_stop()
504 d1 = self.launcher.observe_stop()
448 d1.addCallback(self._handle_stop)
505 d1.addCallback(self._handle_stop)
449 d2 = self.launcher.stop()
506 d2 = self.launcher.stop()
450 return gatherBoth([d1, d2], consumeErrors=True)
507 return gatherBoth([d1, d2], consumeErrors=True)
451 else:
508 else:
452 raise ClusterStateError("Cluster not running")
509 raise ClusterStateError("Cluster not running")
453
510
454 def get_multiengine_client(self):
511 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
455 """Get the multiengine client for the running cluster.
512 """Get the multiengine client for the running cluster.
456
513
457 If this fails, it means that the cluster has not finished starting.
514 If this fails, it means that the cluster has not finished starting.
458 Usually waiting a few seconds are re-trying will solve this.
515 Usually waiting a few seconds are re-trying will solve this.
459 """
516 """
460 if self.client_connector is None:
517 if self.client_connector is None:
461 self.client_connector = AsyncClientConnector()
518 self.client_connector = AsyncClientConnector()
462 return self.client_connector.get_multiengine_client(
519 return self.client_connector.get_multiengine_client(
463 cluster_dir=self.cluster_dir_obj.location
520 cluster_dir=self.cluster_dir_obj.location,
521 delay=delay, max_tries=max_tries
464 )
522 )
465
523
466 def get_task_client(self):
524 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
467 """Get the task client for the running cluster.
525 """Get the task client for the running cluster.
468
526
469 If this fails, it means that the cluster has not finished starting.
527 If this fails, it means that the cluster has not finished starting.
470 Usually waiting a few seconds are re-trying will solve this.
528 Usually waiting a few seconds are re-trying will solve this.
471 """
529 """
472 if self.client_connector is None:
530 if self.client_connector is None:
473 self.client_connector = AsyncClientConnector()
531 self.client_connector = AsyncClientConnector()
474 return self.client_connector.get_task_client(
532 return self.client_connector.get_task_client(
475 cluster_dir=self.cluster_dir_obj.location
533 cluster_dir=self.cluster_dir_obj.location,
534 delay=delay, max_tries=max_tries
476 )
535 )
477
536
478 def get_ipengine_logs(self):
537 def get_ipengine_logs(self):
479 return self.get_logs_by_name('ipengine')
538 return self.get_logs_by_name('ipengine')
480
539
481 def get_ipcontroller_logs(self):
540 def get_ipcontroller_logs(self):
482 return self.get_logs_by_name('ipcontroller')
541 return self.get_logs_by_name('ipcontroller')
483
542
484 def get_ipcluster_logs(self):
543 def get_ipcluster_logs(self):
485 return self.get_logs_by_name('ipcluster')
544 return self.get_logs_by_name('ipcluster')
486
545
487 def get_logs_by_name(self, name='ipcluster'):
546 def get_logs_by_name(self, name='ipcluster'):
488 log_dir = self.cluster_dir_obj.log_dir
547 log_dir = self.cluster_dir_obj.log_dir
489 logs = {}
548 logs = {}
490 for log in os.listdir(log_dir):
549 for log in os.listdir(log_dir):
491 if log.startswith(name + '-') and log.endswith('.log'):
550 if log.startswith(name + '-') and log.endswith('.log'):
492 with open(os.path.join(log_dir, log), 'r') as f:
551 with open(os.path.join(log_dir, log), 'r') as f:
493 logs[log] = f.read()
552 logs[log] = f.read()
494 return logs
553 return logs
495
554
496 def get_logs(self):
555 def get_logs(self):
497 d = self.get_ipcluster_logs()
556 d = self.get_ipcluster_logs()
498 d.update(self.get_ipengine_logs())
557 d.update(self.get_ipengine_logs())
499 d.update(self.get_ipcontroller_logs())
558 d.update(self.get_ipcontroller_logs())
500 return d
559 return d
501
560
502 def _handle_start(self, r):
561 def _handle_start(self, r):
503 self.state = 'running'
562 self.state = 'running'
504
563
505 def _handle_stop(self, r):
564 def _handle_stop(self, r):
506 self.state = 'after'
565 self.state = 'after'
507
566
508
567
509 class Cluster(object):
568 class Cluster(object):
510
569
511
570
512 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
571 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
513 auto_create=False, auto_stop=True):
572 auto_create=False, auto_stop=True):
514 """Create a class to manage an IPython cluster.
573 """Create a class to manage an IPython cluster.
515
574
516 This class calls the :command:`ipcluster` command with the right
575 This class calls the :command:`ipcluster` command with the right
517 options to start an IPython cluster. Typically a cluster directory
576 options to start an IPython cluster. Typically a cluster directory
518 must be created (:command:`ipcluster create`) and configured before
577 must be created (:command:`ipcluster create`) and configured before
519 using this class. Configuration is done by editing the
578 using this class. Configuration is done by editing the
520 configuration files in the top level of the cluster directory.
579 configuration files in the top level of the cluster directory.
521
580
522 Parameters
581 Parameters
523 ----------
582 ----------
524 profile : str
583 profile : str
525 The name of a cluster directory profile (default="default"). The
584 The name of a cluster directory profile (default="default"). The
526 cluster directory "cluster_<profile>" will be searched for
585 cluster directory "cluster_<profile>" will be searched for
527 in ``os.getcwd()``, the ipythondir and then in the directories
586 in ``os.getcwd()``, the ipythondir and then in the directories
528 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
587 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
529 cluster_dir : str
588 cluster_dir : str
530 The full path to a cluster directory. This is useful if profiles
589 The full path to a cluster directory. This is useful if profiles
531 are not being used.
590 are not being used.
532 furl_or_file : str
533 A furl or a filename containing a FURLK. This is useful if you
534 simply know the location of the FURL file.
535 ipythondir : str
591 ipythondir : str
536 The location of the ipythondir if different from the default.
592 The location of the ipythondir if different from the default.
537 This is used if the cluster directory is being found by profile.
593 This is used if the cluster directory is being found by profile.
538 auto_create : bool
594 auto_create : bool
539 Automatically create the cluster directory it is dones't exist.
595 Automatically create the cluster directory it is dones't exist.
540 This will usually only make sense if using a local cluster
596 This will usually only make sense if using a local cluster
541 (default=False).
597 (default=False).
542 auto_stop : bool
598 auto_stop : bool
543 Automatically stop the cluster when this instance is garbage
599 Automatically stop the cluster when this instance is garbage
544 collected (default=True). This is useful if you want the cluster
600 collected (default=True). This is useful if you want the cluster
545 to live beyond your current process. There is also an instance
601 to live beyond your current process. There is also an instance
546 attribute ``auto_stop`` to change this behavior.
602 attribute ``auto_stop`` to change this behavior.
547 """
603 """
548 self.async_cluster = AsyncCluster(
604 self.async_cluster = AsyncCluster(
549 profile, cluster_dir, ipythondir, auto_create, auto_stop
605 profile, cluster_dir, ipythondir, auto_create, auto_stop
550 )
606 )
551 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
607 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
552 self.client_connector = None
608 self.client_connector = None
553
609
554 def _set_auto_stop(self, value):
610 def _set_auto_stop(self, value):
555 self.async_cluster.auto_stop = value
611 self.async_cluster.auto_stop = value
556
612
557 def _get_auto_stop(self):
613 def _get_auto_stop(self):
558 return self.async_cluster.auto_stop
614 return self.async_cluster.auto_stop
559
615
560 auto_stop = property(_get_auto_stop, _set_auto_stop)
616 auto_stop = property(_get_auto_stop, _set_auto_stop)
561
617
562 @property
618 @property
563 def location(self):
619 def location(self):
564 return self.async_cluster.location
620 return self.async_cluster.location
565
621
566 @property
622 @property
567 def running(self):
623 def running(self):
568 return self.async_cluster.running
624 return self.async_cluster.running
569
625
570 def start(self, n=2):
626 def start(self, n=2):
571 """Start the IPython cluster with n engines.
627 """Start the IPython cluster with n engines.
572
628
573 Parameters
629 Parameters
574 ----------
630 ----------
575 n : int
631 n : int
576 The number of engine to start.
632 The number of engine to start.
577 """
633 """
578 return blockingCallFromThread(self.async_cluster.start, n)
634 return blockingCallFromThread(self.async_cluster.start, n)
579
635
580 def stop(self):
636 def stop(self):
581 """Stop the IPython cluster if it is running."""
637 """Stop the IPython cluster if it is running."""
582 return blockingCallFromThread(self.async_cluster.stop)
638 return blockingCallFromThread(self.async_cluster.stop)
583
639
584 def get_multiengine_client(self):
640 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
585 """Get the multiengine client for the running cluster.
641 """Get the multiengine client for the running cluster.
586
642
587 If this fails, it means that the cluster has not finished starting.
643 If this fails, it means that the cluster has not finished starting.
588 Usually waiting a few seconds are re-trying will solve this.
644 Usually waiting a few seconds are re-trying will solve this.
589 """
645 """
590 if self.client_connector is None:
646 if self.client_connector is None:
591 self.client_connector = ClientConnector()
647 self.client_connector = ClientConnector()
592 return self.client_connector.get_multiengine_client(
648 return self.client_connector.get_multiengine_client(
593 cluster_dir=self.cluster_dir_obj.location
649 cluster_dir=self.cluster_dir_obj.location,
650 delay=delay, max_tries=max_tries
594 )
651 )
595
652
596 def get_task_client(self):
653 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
597 """Get the task client for the running cluster.
654 """Get the task client for the running cluster.
598
655
599 If this fails, it means that the cluster has not finished starting.
656 If this fails, it means that the cluster has not finished starting.
600 Usually waiting a few seconds are re-trying will solve this.
657 Usually waiting a few seconds are re-trying will solve this.
601 """
658 """
602 if self.client_connector is None:
659 if self.client_connector is None:
603 self.client_connector = ClientConnector()
660 self.client_connector = ClientConnector()
604 return self.client_connector.get_task_client(
661 return self.client_connector.get_task_client(
605 cluster_dir=self.cluster_dir_obj.location
662 cluster_dir=self.cluster_dir_obj.location,
663 delay=delay, max_tries=max_tries
606 )
664 )
607
665
608 def __repr__(self):
666 def __repr__(self):
609 s = "<Cluster(running=%r, location=%s)" % (self.running, self.location)
667 s = "<Cluster(running=%r, location=%s)" % (self.running, self.location)
610 return s
668 return s
611
669
612 def get_logs_by_name(self, name='ipcluter'):
670 def get_logs_by_name(self, name='ipcluter'):
613 """Get a dict of logs by process name (ipcluster, ipengine, etc.)"""
671 """Get a dict of logs by process name (ipcluster, ipengine, etc.)"""
614 return self.async_cluster.get_logs_by_name(name)
672 return self.async_cluster.get_logs_by_name(name)
615
673
616 def get_ipengine_logs(self):
674 def get_ipengine_logs(self):
617 """Get a dict of logs for all engines in this cluster."""
675 """Get a dict of logs for all engines in this cluster."""
618 return self.async_cluster.get_ipengine_logs()
676 return self.async_cluster.get_ipengine_logs()
619
677
620 def get_ipcontroller_logs(self):
678 def get_ipcontroller_logs(self):
621 """Get a dict of logs for the controller in this cluster."""
679 """Get a dict of logs for the controller in this cluster."""
622 return self.async_cluster.get_ipcontroller_logs()
680 return self.async_cluster.get_ipcontroller_logs()
623
681
624 def get_ipcluster_logs(self):
682 def get_ipcluster_logs(self):
625 """Get a dict of the ipcluster logs for this cluster."""
683 """Get a dict of the ipcluster logs for this cluster."""
626 return self.async_cluster.get_ipcluster_logs()
684 return self.async_cluster.get_ipcluster_logs()
627
685
628 def get_logs(self):
686 def get_logs(self):
629 """Get a dict of all logs for this cluster."""
687 """Get a dict of all logs for this cluster."""
630 return self.async_cluster.get_logs()
688 return self.async_cluster.get_logs()
631
689
632 def _print_logs(self, logs):
690 def _print_logs(self, logs):
633 for k, v in logs.iteritems():
691 for k, v in logs.iteritems():
634 print "==================================="
692 print "==================================="
635 print "Logfile: %s" % k
693 print "Logfile: %s" % k
636 print "==================================="
694 print "==================================="
637 print v
695 print v
638 print
696 print
639
697
640 def print_ipengine_logs(self):
698 def print_ipengine_logs(self):
641 """Print the ipengine logs for this cluster to stdout."""
699 """Print the ipengine logs for this cluster to stdout."""
642 self._print_logs(self.get_ipengine_logs())
700 self._print_logs(self.get_ipengine_logs())
643
701
644 def print_ipcontroller_logs(self):
702 def print_ipcontroller_logs(self):
645 """Print the ipcontroller logs for this cluster to stdout."""
703 """Print the ipcontroller logs for this cluster to stdout."""
646 self._print_logs(self.get_ipcontroller_logs())
704 self._print_logs(self.get_ipcontroller_logs())
647
705
648 def print_ipcluster_logs(self):
706 def print_ipcluster_logs(self):
649 """Print the ipcluster logs for this cluster to stdout."""
707 """Print the ipcluster logs for this cluster to stdout."""
650 self._print_logs(self.get_ipcluster_logs())
708 self._print_logs(self.get_ipcluster_logs())
651
709
652 def print_logs(self):
710 def print_logs(self):
653 """Print all the logs for this cluster to stdout."""
711 """Print all the logs for this cluster to stdout."""
654 self._print_logs(self.get_logs())
712 self._print_logs(self.get_logs())
655
713
@@ -1,130 +1,139 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """A class that manages the engines connection to the controller."""
4 """A class that manages the engines connection to the controller."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os
17 import os
18 import cPickle as pickle
18 import cPickle as pickle
19
19
20 from twisted.python import log, failure
20 from twisted.python import log, failure
21 from twisted.internet import defer
21 from twisted.internet import defer
22 from twisted.internet.defer import inlineCallbacks, returnValue
22 from twisted.internet.defer import inlineCallbacks, returnValue
23
23
24 from IPython.kernel.fcutil import find_furl
24 from IPython.kernel.fcutil import find_furl, validate_furl_or_file
25 from IPython.kernel.enginefc import IFCEngine
25 from IPython.kernel.enginefc import IFCEngine
26 from IPython.kernel.twistedutil import sleep_deferred
26 from IPython.kernel.twistedutil import sleep_deferred, make_deferred
27
27
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29 # The ClientConnector class
29 # The ClientConnector class
30 #-----------------------------------------------------------------------------
30 #-----------------------------------------------------------------------------
31
31
32
32
33 class EngineConnectorError(Exception):
33 class EngineConnectorError(Exception):
34 pass
34 pass
35
35
36
36
37 class EngineConnector(object):
37 class EngineConnector(object):
38 """Manage an engines connection to a controller.
38 """Manage an engines connection to a controller.
39
39
40 This class takes a foolscap `Tub` and provides a `connect_to_controller`
40 This class takes a foolscap `Tub` and provides a `connect_to_controller`
41 method that will use the `Tub` to connect to a controller and register
41 method that will use the `Tub` to connect to a controller and register
42 the engine with the controller.
42 the engine with the controller.
43 """
43 """
44
44
45 def __init__(self, tub):
45 def __init__(self, tub):
46 self.tub = tub
46 self.tub = tub
47
47
48 @make_deferred
48 def connect_to_controller(self, engine_service, furl_or_file,
49 def connect_to_controller(self, engine_service, furl_or_file,
49 delay=0.1, max_tries=10):
50 delay=0.1, max_tries=10):
50 """
51 """
51 Make a connection to a controller specified by a furl.
52 Make a connection to a controller specified by a furl.
52
53
53 This method takes an `IEngineBase` instance and a foolcap URL and uses
54 This method takes an `IEngineBase` instance and a foolcap URL and uses
54 the `tub` attribute to make a connection to the controller. The
55 the `tub` attribute to make a connection to the controller. The
55 foolscap URL contains all the information needed to connect to the
56 foolscap URL contains all the information needed to connect to the
56 controller, including the ip and port as well as any encryption and
57 controller, including the ip and port as well as any encryption and
57 authentication information needed for the connection.
58 authentication information needed for the connection.
58
59
59 After getting a reference to the controller, this method calls the
60 After getting a reference to the controller, this method calls the
60 `register_engine` method of the controller to actually register the
61 `register_engine` method of the controller to actually register the
61 engine.
62 engine.
62
63
63 This method will try to connect to the controller multiple times with
64 This method will try to connect to the controller multiple times with
64 a delay in between. Each time the FURL file is read anew.
65 a delay in between. Each time the FURL file is read anew.
65
66
66 Parameters
67 Parameters
67 __________
68 __________
68 engine_service : IEngineBase
69 engine_service : IEngineBase
69 An instance of an `IEngineBase` implementer
70 An instance of an `IEngineBase` implementer
70 furl_or_file : str
71 furl_or_file : str
71 A furl or a filename containing a furl
72 A furl or a filename containing a furl
72 delay : float
73 delay : float
73 The intial time to wait between connection attempts. Subsequent
74 The intial time to wait between connection attempts. Subsequent
74 attempts have increasing delays.
75 attempts have increasing delays.
75 max_tries : int
76 max_tries : int
76 The maximum number of connection attempts.
77 The maximum number of connection attempts.
78
79 Returns
80 -------
81 A deferred to the registered client or a failure to an error
82 like :exc:`FURLError`.
77 """
83 """
78 if not self.tub.running:
84 if not self.tub.running:
79 self.tub.startService()
85 self.tub.startService()
80 self.engine_service = engine_service
86 self.engine_service = engine_service
81 self.engine_reference = IFCEngine(self.engine_service)
87 self.engine_reference = IFCEngine(self.engine_service)
82
88
89 validate_furl_or_file(furl_or_file)
83 d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0)
90 d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0)
91 d.addCallback(self._register)
84 return d
92 return d
85
93
86 @inlineCallbacks
94 @inlineCallbacks
87 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
95 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
88 """Try to connect to the controller with retry logic."""
96 """Try to connect to the controller with retry logic."""
89 if attempt < max_tries:
97 if attempt < max_tries:
90 log.msg("Attempting to connect to controller [%r]: %s" % \
98 log.msg("Attempting to connect to controller [%r]: %s" % \
91 (attempt, furl_or_file))
99 (attempt, furl_or_file))
92 try:
100 try:
93 self.furl = find_furl(furl_or_file)
101 self.furl = find_furl(furl_or_file)
94 # Uncomment this to see the FURL being tried.
102 # Uncomment this to see the FURL being tried.
95 # log.msg("FURL: %s" % self.furl)
103 # log.msg("FURL: %s" % self.furl)
96 rr = yield self.tub.getReference(self.furl)
104 rr = yield self.tub.getReference(self.furl)
97 except:
105 except:
98 if attempt==max_tries-1:
106 if attempt==max_tries-1:
99 # This will propagate the exception all the way to the top
107 # This will propagate the exception all the way to the top
100 # where it can be handled.
108 # where it can be handled.
101 raise
109 raise
102 else:
110 else:
103 yield sleep_deferred(delay)
111 yield sleep_deferred(delay)
104 yield self._try_to_connect(
112 rr = yield self._try_to_connect(
105 furl_or_file, 1.5*delay, max_tries, attempt+1
113 furl_or_file, 1.5*delay, max_tries, attempt+1
106 )
114 )
115 # rr becomes an int when there is a connection!!!
116 returnValue(rr)
107 else:
117 else:
108 result = yield self._register(rr)
118 returnValue(rr)
109 returnValue(result)
110 else:
119 else:
111 raise EngineConnectorError(
120 raise EngineConnectorError(
112 'Could not connect to controller, max_tries (%r) exceeded. '
121 'Could not connect to controller, max_tries (%r) exceeded. '
113 'This usually means that i) the controller was not started, '
122 'This usually means that i) the controller was not started, '
114 'or ii) a firewall was blocking the engine from connecting '
123 'or ii) a firewall was blocking the engine from connecting '
115 'to the controller.' % max_tries
124 'to the controller.' % max_tries
116 )
125 )
117
126
118 def _register(self, rr):
127 def _register(self, rr):
119 self.remote_ref = rr
128 self.remote_ref = rr
120 # Now register myself with the controller
129 # Now register myself with the controller
121 desired_id = self.engine_service.id
130 desired_id = self.engine_service.id
122 d = self.remote_ref.callRemote('register_engine', self.engine_reference,
131 d = self.remote_ref.callRemote('register_engine', self.engine_reference,
123 desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2))
132 desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2))
124 return d.addCallback(self._reference_sent)
133 return d.addCallback(self._reference_sent)
125
134
126 def _reference_sent(self, registration_dict):
135 def _reference_sent(self, registration_dict):
127 self.engine_service.id = registration_dict['id']
136 self.engine_service.id = registration_dict['id']
128 log.msg("engine registration succeeded, got id: %r" % self.engine_service.id)
137 log.msg("engine registration succeeded, got id: %r" % self.engine_service.id)
129 return self.engine_service.id
138 return self.engine_service.id
130
139
@@ -1,239 +1,266 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3 """
3 """
4 Foolscap related utilities.
4 Foolscap related utilities.
5 """
5 """
6
6
7 #-----------------------------------------------------------------------------
7 #-----------------------------------------------------------------------------
8 # Copyright (C) 2008-2009 The IPython Development Team
8 # Copyright (C) 2008-2009 The IPython Development Team
9 #
9 #
10 # Distributed under the terms of the BSD License. The full license is in
10 # Distributed under the terms of the BSD License. The full license is in
11 # the file COPYING, distributed as part of this software.
11 # the file COPYING, distributed as part of this software.
12 #-----------------------------------------------------------------------------
12 #-----------------------------------------------------------------------------
13
13
14 #-----------------------------------------------------------------------------
14 #-----------------------------------------------------------------------------
15 # Imports
15 # Imports
16 #-----------------------------------------------------------------------------
16 #-----------------------------------------------------------------------------
17
17
18 from __future__ import with_statement
18 from __future__ import with_statement
19
19
20 import os
20 import os
21 import tempfile
21 import tempfile
22
22
23 from twisted.internet import reactor, defer
23 from twisted.internet import reactor, defer
24 from twisted.python import log
24 from twisted.python import log
25
25
26 from foolscap import Tub, UnauthenticatedTub
26 from foolscap import Tub, UnauthenticatedTub
27
27
28 from IPython.config.loader import Config
28 from IPython.config.loader import Config
29
29
30 from IPython.kernel.configobjfactory import AdaptedConfiguredObjectFactory
30 from IPython.kernel.configobjfactory import AdaptedConfiguredObjectFactory
31
31
32 from IPython.kernel.error import SecurityError
32 from IPython.kernel.error import SecurityError
33
33
34 from IPython.utils.traitlets import Int, Str, Bool, Instance
34 from IPython.utils.traitlets import Int, Str, Bool, Instance
35 from IPython.utils.importstring import import_item
35 from IPython.utils.importstring import import_item
36
36
37 #-----------------------------------------------------------------------------
37 #-----------------------------------------------------------------------------
38 # Code
38 # Code
39 #-----------------------------------------------------------------------------
39 #-----------------------------------------------------------------------------
40
40
41
41
42 # We do this so if a user doesn't have OpenSSL installed, it will try to use
42 # We do this so if a user doesn't have OpenSSL installed, it will try to use
43 # an UnauthenticatedTub. But, they will still run into problems if they
43 # an UnauthenticatedTub. But, they will still run into problems if they
44 # try to use encrypted furls.
44 # try to use encrypted furls.
45 try:
45 try:
46 import OpenSSL
46 import OpenSSL
47 except:
47 except:
48 Tub = UnauthenticatedTub
48 Tub = UnauthenticatedTub
49 have_crypto = False
49 have_crypto = False
50 else:
50 else:
51 have_crypto = True
51 have_crypto = True
52
52
53
53
54 class FURLError(Exception):
55 pass
56
57
54 def check_furl_file_security(furl_file, secure):
58 def check_furl_file_security(furl_file, secure):
55 """Remove the old furl_file if changing security modes."""
59 """Remove the old furl_file if changing security modes."""
56 if os.path.isfile(furl_file):
60 if os.path.isfile(furl_file):
57 f = open(furl_file, 'r')
61 f = open(furl_file, 'r')
58 oldfurl = f.read().strip()
62 oldfurl = f.read().strip()
59 f.close()
63 f.close()
60 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
64 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
61 os.remove(furl_file)
65 os.remove(furl_file)
62
66
63
67
64 def is_secure(furl):
68 def is_secure(furl):
65 """Is the given FURL secure or not."""
69 """Is the given FURL secure or not."""
66 if is_valid(furl):
70 if is_valid(furl):
67 if furl.startswith("pb://"):
71 if furl.startswith("pb://"):
68 return True
72 return True
69 elif furl.startswith("pbu://"):
73 elif furl.startswith("pbu://"):
70 return False
74 return False
71 else:
75 else:
72 raise ValueError("invalid FURL: %s" % furl)
76 raise FURLError("invalid FURL: %s" % furl)
73
77
74
78
75 def is_valid(furl):
79 def is_valid(furl):
76 """Is the str a valid FURL or not."""
80 """Is the str a valid FURL or not."""
77 if isinstance(furl, str):
81 if isinstance(furl, str):
78 if furl.startswith("pb://") or furl.startswith("pbu://"):
82 if furl.startswith("pb://") or furl.startswith("pbu://"):
79 return True
83 return True
80 else:
84 else:
81 return False
85 return False
82
86
83
87
84 def find_furl(furl_or_file):
88 def find_furl(furl_or_file):
85 """Find, validate and return a FURL in a string or file."""
89 """Find, validate and return a FURL in a string or file."""
86 if isinstance(furl_or_file, str):
90 if isinstance(furl_or_file, str):
87 if is_valid(furl_or_file):
91 if is_valid(furl_or_file):
88 return furl_or_file
92 return furl_or_file
89 if os.path.isfile(furl_or_file):
93 if os.path.isfile(furl_or_file):
90 with open(furl_or_file, 'r') as f:
94 with open(furl_or_file, 'r') as f:
91 furl = f.read().strip()
95 furl = f.read().strip()
92 if is_valid(furl):
96 if is_valid(furl):
93 return furl
97 return furl
94 raise ValueError("Not a FURL or a file containing a FURL: %s" % furl_or_file)
98 raise FURLError("Not a valid FURL or FURL file: %s" % furl_or_file)
99
100
101 def is_valid_furl_or_file(furl_or_file):
102 """Validate a FURL or a FURL file.
103
104 If ``furl_or_file`` looks like a file, we simply make sure its directory
105 exists and that it has a ``.furl`` file extension. We don't try to see
106 if the FURL file exists or to read its contents. This is useful for
107 cases where auto re-connection is being used.
108 """
109 if isinstance(furl_or_file, str):
110 if is_valid(furl_or_file):
111 return True
112 if isinstance(furl_or_file, (str, unicode)):
113 path, furl_filename = os.path.split(furl_or_file)
114 if os.path.isdir(path) and furl_filename.endswith('.furl'):
115 return True
116 return False
117
118
119 def validate_furl_or_file(furl_or_file):
120 if not is_valid_furl_or_file(furl_or_file):
121 raise FURLError('Not a valid FURL or FURL file: %r' % furl_or_file)
95
122
96
123
97 def get_temp_furlfile(filename):
124 def get_temp_furlfile(filename):
98 """Return a temporary FURL file."""
125 """Return a temporary FURL file."""
99 return tempfile.mktemp(dir=os.path.dirname(filename),
126 return tempfile.mktemp(dir=os.path.dirname(filename),
100 prefix=os.path.basename(filename))
127 prefix=os.path.basename(filename))
101
128
102
129
103 def make_tub(ip, port, secure, cert_file):
130 def make_tub(ip, port, secure, cert_file):
104 """Create a listening tub given an ip, port, and cert_file location.
131 """Create a listening tub given an ip, port, and cert_file location.
105
132
106 Parameters
133 Parameters
107 ----------
134 ----------
108 ip : str
135 ip : str
109 The ip address or hostname that the tub should listen on.
136 The ip address or hostname that the tub should listen on.
110 Empty means all interfaces.
137 Empty means all interfaces.
111 port : int
138 port : int
112 The port that the tub should listen on. A value of 0 means
139 The port that the tub should listen on. A value of 0 means
113 pick a random port
140 pick a random port
114 secure: bool
141 secure: bool
115 Will the connection be secure (in the Foolscap sense).
142 Will the connection be secure (in the Foolscap sense).
116 cert_file: str
143 cert_file: str
117 A filename of a file to be used for theSSL certificate.
144 A filename of a file to be used for theSSL certificate.
118
145
119 Returns
146 Returns
120 -------
147 -------
121 A tub, listener tuple.
148 A tub, listener tuple.
122 """
149 """
123 if secure:
150 if secure:
124 if have_crypto:
151 if have_crypto:
125 tub = Tub(certFile=cert_file)
152 tub = Tub(certFile=cert_file)
126 else:
153 else:
127 raise SecurityError("OpenSSL/pyOpenSSL is not available, so we "
154 raise SecurityError("OpenSSL/pyOpenSSL is not available, so we "
128 "can't run in secure mode. Try running without "
155 "can't run in secure mode. Try running without "
129 "security using 'ipcontroller -xy'.")
156 "security using 'ipcontroller -xy'.")
130 else:
157 else:
131 tub = UnauthenticatedTub()
158 tub = UnauthenticatedTub()
132
159
133 # Set the strport based on the ip and port and start listening
160 # Set the strport based on the ip and port and start listening
134 if ip == '':
161 if ip == '':
135 strport = "tcp:%i" % port
162 strport = "tcp:%i" % port
136 else:
163 else:
137 strport = "tcp:%i:interface=%s" % (port, ip)
164 strport = "tcp:%i:interface=%s" % (port, ip)
138 log.msg("Starting listener with [secure=%r] on: %s" % (secure, strport))
165 log.msg("Starting listener with [secure=%r] on: %s" % (secure, strport))
139 listener = tub.listenOn(strport)
166 listener = tub.listenOn(strport)
140
167
141 return tub, listener
168 return tub, listener
142
169
143
170
144 class FCServiceFactory(AdaptedConfiguredObjectFactory):
171 class FCServiceFactory(AdaptedConfiguredObjectFactory):
145 """This class creates a tub with various services running in it.
172 """This class creates a tub with various services running in it.
146
173
147 The basic idea is that :meth:`create` returns a running :class:`Tub`
174 The basic idea is that :meth:`create` returns a running :class:`Tub`
148 instance that has a number of Foolscap references registered in it.
175 instance that has a number of Foolscap references registered in it.
149 This class is a subclass of :class:`IPython.core.component.Component`
176 This class is a subclass of :class:`IPython.core.component.Component`
150 so the IPython configuration and component system are used.
177 so the IPython configuration and component system are used.
151
178
152 Attributes
179 Attributes
153 ----------
180 ----------
154 interfaces : Config
181 interfaces : Config
155 A Config instance whose values are sub-Config objects having two
182 A Config instance whose values are sub-Config objects having two
156 keys: furl_file and interface_chain.
183 keys: furl_file and interface_chain.
157
184
158 The other attributes are the standard ones for Foolscap.
185 The other attributes are the standard ones for Foolscap.
159 """
186 """
160
187
161 ip = Str('', config=True)
188 ip = Str('', config=True)
162 port = Int(0, config=True)
189 port = Int(0, config=True)
163 secure = Bool(True, config=True)
190 secure = Bool(True, config=True)
164 cert_file = Str('', config=True)
191 cert_file = Str('', config=True)
165 location = Str('', config=True)
192 location = Str('', config=True)
166 reuse_furls = Bool(False, config=True)
193 reuse_furls = Bool(False, config=True)
167 interfaces = Instance(klass=Config, kw={}, allow_none=False, config=True)
194 interfaces = Instance(klass=Config, kw={}, allow_none=False, config=True)
168
195
169 def __init__(self, config, adaptee):
196 def __init__(self, config, adaptee):
170 super(FCServiceFactory, self).__init__(config, adaptee)
197 super(FCServiceFactory, self).__init__(config, adaptee)
171 self._check_reuse_furls()
198 self._check_reuse_furls()
172
199
173 def _ip_changed(self, name, old, new):
200 def _ip_changed(self, name, old, new):
174 if new == 'localhost' or new == '127.0.0.1':
201 if new == 'localhost' or new == '127.0.0.1':
175 self.location = '127.0.0.1'
202 self.location = '127.0.0.1'
176
203
177 def _check_reuse_furls(self):
204 def _check_reuse_furls(self):
178 furl_files = [i.furl_file for i in self.interfaces.values()]
205 furl_files = [i.furl_file for i in self.interfaces.values()]
179 for ff in furl_files:
206 for ff in furl_files:
180 fullfile = self._get_security_file(ff)
207 fullfile = self._get_security_file(ff)
181 if self.reuse_furls:
208 if self.reuse_furls:
182 log.msg("Reusing FURL file: %s" % fullfile)
209 log.msg("Reusing FURL file: %s" % fullfile)
183 else:
210 else:
184 if os.path.isfile(fullfile):
211 if os.path.isfile(fullfile):
185 log.msg("Removing old FURL file: %s" % fullfile)
212 log.msg("Removing old FURL file: %s" % fullfile)
186 os.remove(fullfile)
213 os.remove(fullfile)
187
214
188 def _get_security_file(self, filename):
215 def _get_security_file(self, filename):
189 return os.path.join(self.config.Global.security_dir, filename)
216 return os.path.join(self.config.Global.security_dir, filename)
190
217
191 def create(self):
218 def create(self):
192 """Create and return the Foolscap tub with everything running."""
219 """Create and return the Foolscap tub with everything running."""
193
220
194 self.tub, self.listener = make_tub(
221 self.tub, self.listener = make_tub(
195 self.ip, self.port, self.secure,
222 self.ip, self.port, self.secure,
196 self._get_security_file(self.cert_file)
223 self._get_security_file(self.cert_file)
197 )
224 )
198 # log.msg("Interfaces to register [%r]: %r" % \
225 # log.msg("Interfaces to register [%r]: %r" % \
199 # (self.__class__, self.interfaces))
226 # (self.__class__, self.interfaces))
200 if not self.secure:
227 if not self.secure:
201 log.msg("WARNING: running with no security: %s" % \
228 log.msg("WARNING: running with no security: %s" % \
202 self.__class__.__name__)
229 self.__class__.__name__)
203 reactor.callWhenRunning(self.set_location_and_register)
230 reactor.callWhenRunning(self.set_location_and_register)
204 return self.tub
231 return self.tub
205
232
206 def set_location_and_register(self):
233 def set_location_and_register(self):
207 """Set the location for the tub and return a deferred."""
234 """Set the location for the tub and return a deferred."""
208
235
209 if self.location == '':
236 if self.location == '':
210 d = self.tub.setLocationAutomatically()
237 d = self.tub.setLocationAutomatically()
211 else:
238 else:
212 d = defer.maybeDeferred(self.tub.setLocation,
239 d = defer.maybeDeferred(self.tub.setLocation,
213 "%s:%i" % (self.location, self.listener.getPortnum()))
240 "%s:%i" % (self.location, self.listener.getPortnum()))
214 self.adapt_to_interfaces(d)
241 self.adapt_to_interfaces(d)
215
242
216 def adapt_to_interfaces(self, d):
243 def adapt_to_interfaces(self, d):
217 """Run through the interfaces, adapt and register."""
244 """Run through the interfaces, adapt and register."""
218
245
219 for ifname, ifconfig in self.interfaces.iteritems():
246 for ifname, ifconfig in self.interfaces.iteritems():
220 ff = self._get_security_file(ifconfig.furl_file)
247 ff = self._get_security_file(ifconfig.furl_file)
221 log.msg("Adapting [%s] to interface: %s" % \
248 log.msg("Adapting [%s] to interface: %s" % \
222 (self.adaptee.__class__.__name__, ifname))
249 (self.adaptee.__class__.__name__, ifname))
223 log.msg("Saving FURL for interface [%s] to file: %s" % (ifname, ff))
250 log.msg("Saving FURL for interface [%s] to file: %s" % (ifname, ff))
224 check_furl_file_security(ff, self.secure)
251 check_furl_file_security(ff, self.secure)
225 adaptee = self.adaptee
252 adaptee = self.adaptee
226 for i in ifconfig.interface_chain:
253 for i in ifconfig.interface_chain:
227 adaptee = import_item(i)(adaptee)
254 adaptee = import_item(i)(adaptee)
228 d.addCallback(self.register, adaptee, furl_file=ff)
255 d.addCallback(self.register, adaptee, furl_file=ff)
229
256
230 def register(self, empty, ref, furl_file):
257 def register(self, empty, ref, furl_file):
231 """Register the reference with the FURL file.
258 """Register the reference with the FURL file.
232
259
233 The FURL file is created and then moved to make sure that when the
260 The FURL file is created and then moved to make sure that when the
234 file appears, the buffer has been flushed and the file closed.
261 file appears, the buffer has been flushed and the file closed.
235 """
262 """
236 temp_furl_file = get_temp_furlfile(furl_file)
263 temp_furl_file = get_temp_furlfile(furl_file)
237 self.tub.registerReference(ref, furlFile=temp_furl_file)
264 self.tub.registerReference(ref, furlFile=temp_furl_file)
238 os.rename(temp_furl_file, furl_file)
265 os.rename(temp_furl_file, furl_file)
239
266
@@ -1,263 +1,274 b''
1 #!/usr/bin/env python
1 #!/usr/bin/env python
2 # encoding: utf-8
2 # encoding: utf-8
3
3
4 """Things directly related to all of twisted."""
4 """Things directly related to all of twisted."""
5
5
6 #-----------------------------------------------------------------------------
6 #-----------------------------------------------------------------------------
7 # Copyright (C) 2008-2009 The IPython Development Team
7 # Copyright (C) 2008-2009 The IPython Development Team
8 #
8 #
9 # Distributed under the terms of the BSD License. The full license is in
9 # Distributed under the terms of the BSD License. The full license is in
10 # the file COPYING, distributed as part of this software.
10 # the file COPYING, distributed as part of this software.
11 #-----------------------------------------------------------------------------
11 #-----------------------------------------------------------------------------
12
12
13 #-----------------------------------------------------------------------------
13 #-----------------------------------------------------------------------------
14 # Imports
14 # Imports
15 #-----------------------------------------------------------------------------
15 #-----------------------------------------------------------------------------
16
16
17 import os, sys
17 import os, sys
18 import threading, Queue, atexit
18 import threading, Queue, atexit
19
19
20 import twisted
20 import twisted
21 from twisted.internet import defer, reactor
21 from twisted.internet import defer, reactor
22 from twisted.python import log, failure
22 from twisted.python import log, failure
23
23
24 from IPython.kernel.error import FileTimeoutError
24 from IPython.kernel.error import FileTimeoutError
25
25
26 #-----------------------------------------------------------------------------
26 #-----------------------------------------------------------------------------
27 # Classes related to twisted and threads
27 # Classes related to twisted and threads
28 #-----------------------------------------------------------------------------
28 #-----------------------------------------------------------------------------
29
29
30
30
31 class ReactorInThread(threading.Thread):
31 class ReactorInThread(threading.Thread):
32 """Run the twisted reactor in a different thread.
32 """Run the twisted reactor in a different thread.
33
33
34 For the process to be able to exit cleanly, do the following:
34 For the process to be able to exit cleanly, do the following:
35
35
36 rit = ReactorInThread()
36 rit = ReactorInThread()
37 rit.setDaemon(True)
37 rit.setDaemon(True)
38 rit.start()
38 rit.start()
39
39
40 """
40 """
41
41
42 def run(self):
42 def run(self):
43 """Run the twisted reactor in a thread.
44
45 This runs the reactor with installSignalHandlers=0, which prevents
46 twisted from installing any of its own signal handlers. This needs to
47 be disabled because signal.signal can't be called in a thread. The
48 only problem with this is that SIGCHLD events won't be detected so
49 spawnProcess won't detect that its processes have been killed by
50 an external factor.
51 """
43 reactor.run(installSignalHandlers=0)
52 reactor.run(installSignalHandlers=0)
44 # self.join()
53 # self.join()
45
54
46 def stop(self):
55 def stop(self):
47 # I don't think this does anything useful.
56 # I don't think this does anything useful.
48 blockingCallFromThread(reactor.stop)
57 blockingCallFromThread(reactor.stop)
49 self.join()
58 self.join()
50
59
51 if(twisted.version.major >= 8):
60 if(twisted.version.major >= 8):
52 import twisted.internet.threads
61 import twisted.internet.threads
53 def blockingCallFromThread(f, *a, **kw):
62 def blockingCallFromThread(f, *a, **kw):
54 """
63 """
55 Run a function in the reactor from a thread, and wait for the result
64 Run a function in the reactor from a thread, and wait for the result
56 synchronously, i.e. until the callback chain returned by the function get a
65 synchronously, i.e. until the callback chain returned by the function get a
57 result.
66 result.
58
67
59 Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw),
68 Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw),
60 passing twisted.internet.reactor for the first argument.
69 passing twisted.internet.reactor for the first argument.
61
70
62 @param f: the callable to run in the reactor thread
71 @param f: the callable to run in the reactor thread
63 @type f: any callable.
72 @type f: any callable.
64 @param a: the arguments to pass to C{f}.
73 @param a: the arguments to pass to C{f}.
65 @param kw: the keyword arguments to pass to C{f}.
74 @param kw: the keyword arguments to pass to C{f}.
66
75
67 @return: the result of the callback chain.
76 @return: the result of the callback chain.
68 @raise: any error raised during the callback chain.
77 @raise: any error raised during the callback chain.
69 """
78 """
70 return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw)
79 return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw)
71
80
72 else:
81 else:
73 def blockingCallFromThread(f, *a, **kw):
82 def blockingCallFromThread(f, *a, **kw):
74 """
83 """
75 Run a function in the reactor from a thread, and wait for the result
84 Run a function in the reactor from a thread, and wait for the result
76 synchronously, i.e. until the callback chain returned by the function get a
85 synchronously, i.e. until the callback chain returned by the function get a
77 result.
86 result.
78
87
79 @param f: the callable to run in the reactor thread
88 @param f: the callable to run in the reactor thread
80 @type f: any callable.
89 @type f: any callable.
81 @param a: the arguments to pass to C{f}.
90 @param a: the arguments to pass to C{f}.
82 @param kw: the keyword arguments to pass to C{f}.
91 @param kw: the keyword arguments to pass to C{f}.
83
92
84 @return: the result of the callback chain.
93 @return: the result of the callback chain.
85 @raise: any error raised during the callback chain.
94 @raise: any error raised during the callback chain.
86 """
95 """
87 from twisted.internet import reactor
96 from twisted.internet import reactor
88 queue = Queue.Queue()
97 queue = Queue.Queue()
89 def _callFromThread():
98 def _callFromThread():
90 result = defer.maybeDeferred(f, *a, **kw)
99 result = defer.maybeDeferred(f, *a, **kw)
91 result.addBoth(queue.put)
100 result.addBoth(queue.put)
92
101
93 reactor.callFromThread(_callFromThread)
102 reactor.callFromThread(_callFromThread)
94 result = queue.get()
103 result = queue.get()
95 if isinstance(result, failure.Failure):
104 if isinstance(result, failure.Failure):
96 # This makes it easier for the debugger to get access to the instance
105 # This makes it easier for the debugger to get access to the instance
97 try:
106 try:
98 result.raiseException()
107 result.raiseException()
99 except Exception, e:
108 except Exception, e:
100 raise e
109 raise e
101 return result
110 return result
102
111
103
112
104
113
105 #-------------------------------------------------------------------------------
114 #-------------------------------------------------------------------------------
106 # Things for managing deferreds
115 # Things for managing deferreds
107 #-------------------------------------------------------------------------------
116 #-------------------------------------------------------------------------------
108
117
109
118
110 def parseResults(results):
119 def parseResults(results):
111 """Pull out results/Failures from a DeferredList."""
120 """Pull out results/Failures from a DeferredList."""
112 return [x[1] for x in results]
121 return [x[1] for x in results]
113
122
114 def gatherBoth(dlist, fireOnOneCallback=0,
123 def gatherBoth(dlist, fireOnOneCallback=0,
115 fireOnOneErrback=0,
124 fireOnOneErrback=0,
116 consumeErrors=0,
125 consumeErrors=0,
117 logErrors=0):
126 logErrors=0):
118 """This is like gatherBoth, but sets consumeErrors=1."""
127 """This is like gatherBoth, but sets consumeErrors=1."""
119 d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback,
128 d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback,
120 consumeErrors, logErrors)
129 consumeErrors, logErrors)
121 if not fireOnOneCallback:
130 if not fireOnOneCallback:
122 d.addCallback(parseResults)
131 d.addCallback(parseResults)
123 return d
132 return d
124
133
125 SUCCESS = True
134 SUCCESS = True
126 FAILURE = False
135 FAILURE = False
127
136
128 class DeferredList(defer.Deferred):
137 class DeferredList(defer.Deferred):
129 """I combine a group of deferreds into one callback.
138 """I combine a group of deferreds into one callback.
130
139
131 I track a list of L{Deferred}s for their callbacks, and make a single
140 I track a list of L{Deferred}s for their callbacks, and make a single
132 callback when they have all completed, a list of (success, result)
141 callback when they have all completed, a list of (success, result)
133 tuples, 'success' being a boolean.
142 tuples, 'success' being a boolean.
134
143
135 Note that you can still use a L{Deferred} after putting it in a
144 Note that you can still use a L{Deferred} after putting it in a
136 DeferredList. For example, you can suppress 'Unhandled error in Deferred'
145 DeferredList. For example, you can suppress 'Unhandled error in Deferred'
137 messages by adding errbacks to the Deferreds *after* putting them in the
146 messages by adding errbacks to the Deferreds *after* putting them in the
138 DeferredList, as a DeferredList won't swallow the errors. (Although a more
147 DeferredList, as a DeferredList won't swallow the errors. (Although a more
139 convenient way to do this is simply to set the consumeErrors flag)
148 convenient way to do this is simply to set the consumeErrors flag)
140
149
141 Note: This is a modified version of the twisted.internet.defer.DeferredList
150 Note: This is a modified version of the twisted.internet.defer.DeferredList
142 """
151 """
143
152
144 fireOnOneCallback = 0
153 fireOnOneCallback = 0
145 fireOnOneErrback = 0
154 fireOnOneErrback = 0
146
155
147 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
156 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
148 consumeErrors=0, logErrors=0):
157 consumeErrors=0, logErrors=0):
149 """Initialize a DeferredList.
158 """Initialize a DeferredList.
150
159
151 @type deferredList: C{list} of L{Deferred}s
160 @type deferredList: C{list} of L{Deferred}s
152 @param deferredList: The list of deferreds to track.
161 @param deferredList: The list of deferreds to track.
153 @param fireOnOneCallback: (keyword param) a flag indicating that
162 @param fireOnOneCallback: (keyword param) a flag indicating that
154 only one callback needs to be fired for me to call
163 only one callback needs to be fired for me to call
155 my callback
164 my callback
156 @param fireOnOneErrback: (keyword param) a flag indicating that
165 @param fireOnOneErrback: (keyword param) a flag indicating that
157 only one errback needs to be fired for me to call
166 only one errback needs to be fired for me to call
158 my errback
167 my errback
159 @param consumeErrors: (keyword param) a flag indicating that any errors
168 @param consumeErrors: (keyword param) a flag indicating that any errors
160 raised in the original deferreds should be
169 raised in the original deferreds should be
161 consumed by this DeferredList. This is useful to
170 consumed by this DeferredList. This is useful to
162 prevent spurious warnings being logged.
171 prevent spurious warnings being logged.
163 """
172 """
164 self.resultList = [None] * len(deferredList)
173 self.resultList = [None] * len(deferredList)
165 defer.Deferred.__init__(self)
174 defer.Deferred.__init__(self)
166 if len(deferredList) == 0 and not fireOnOneCallback:
175 if len(deferredList) == 0 and not fireOnOneCallback:
167 self.callback(self.resultList)
176 self.callback(self.resultList)
168
177
169 # These flags need to be set *before* attaching callbacks to the
178 # These flags need to be set *before* attaching callbacks to the
170 # deferreds, because the callbacks use these flags, and will run
179 # deferreds, because the callbacks use these flags, and will run
171 # synchronously if any of the deferreds are already fired.
180 # synchronously if any of the deferreds are already fired.
172 self.fireOnOneCallback = fireOnOneCallback
181 self.fireOnOneCallback = fireOnOneCallback
173 self.fireOnOneErrback = fireOnOneErrback
182 self.fireOnOneErrback = fireOnOneErrback
174 self.consumeErrors = consumeErrors
183 self.consumeErrors = consumeErrors
175 self.logErrors = logErrors
184 self.logErrors = logErrors
176 self.finishedCount = 0
185 self.finishedCount = 0
177
186
178 index = 0
187 index = 0
179 for deferred in deferredList:
188 for deferred in deferredList:
180 deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
189 deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
181 callbackArgs=(index,SUCCESS),
190 callbackArgs=(index,SUCCESS),
182 errbackArgs=(index,FAILURE))
191 errbackArgs=(index,FAILURE))
183 index = index + 1
192 index = index + 1
184
193
185 def _cbDeferred(self, result, index, succeeded):
194 def _cbDeferred(self, result, index, succeeded):
186 """(internal) Callback for when one of my deferreds fires.
195 """(internal) Callback for when one of my deferreds fires.
187 """
196 """
188 self.resultList[index] = (succeeded, result)
197 self.resultList[index] = (succeeded, result)
189
198
190 self.finishedCount += 1
199 self.finishedCount += 1
191 if not self.called:
200 if not self.called:
192 if succeeded == SUCCESS and self.fireOnOneCallback:
201 if succeeded == SUCCESS and self.fireOnOneCallback:
193 self.callback((result, index))
202 self.callback((result, index))
194 elif succeeded == FAILURE and self.fireOnOneErrback:
203 elif succeeded == FAILURE and self.fireOnOneErrback:
195 # We have modified this to fire the errback chain with the actual
204 # We have modified this to fire the errback chain with the actual
196 # Failure instance the originally occured rather than twisted's
205 # Failure instance the originally occured rather than twisted's
197 # FirstError which wraps the failure
206 # FirstError which wraps the failure
198 self.errback(result)
207 self.errback(result)
199 elif self.finishedCount == len(self.resultList):
208 elif self.finishedCount == len(self.resultList):
200 self.callback(self.resultList)
209 self.callback(self.resultList)
201
210
202 if succeeded == FAILURE and self.logErrors:
211 if succeeded == FAILURE and self.logErrors:
203 log.err(result)
212 log.err(result)
204 if succeeded == FAILURE and self.consumeErrors:
213 if succeeded == FAILURE and self.consumeErrors:
205 result = None
214 result = None
206
215
207 return result
216 return result
208
217
209
218
210 def wait_for_file(filename, delay=0.1, max_tries=10):
219 def wait_for_file(filename, delay=0.1, max_tries=10):
211 """Wait (poll) for a file to be created.
220 """Wait (poll) for a file to be created.
212
221
213 This method returns a Deferred that will fire when a file exists. It
222 This method returns a Deferred that will fire when a file exists. It
214 works by polling os.path.isfile in time intervals specified by the
223 works by polling os.path.isfile in time intervals specified by the
215 delay argument. If `max_tries` is reached, it will errback with a
224 delay argument. If `max_tries` is reached, it will errback with a
216 `FileTimeoutError`.
225 `FileTimeoutError`.
217
226
218 Parameters
227 Parameters
219 ----------
228 ----------
220 filename : str
229 filename : str
221 The name of the file to wait for.
230 The name of the file to wait for.
222 delay : float
231 delay : float
223 The time to wait between polls.
232 The time to wait between polls.
224 max_tries : int
233 max_tries : int
225 The max number of attempts before raising `FileTimeoutError`
234 The max number of attempts before raising `FileTimeoutError`
226
235
227 Returns
236 Returns
228 -------
237 -------
229 d : Deferred
238 d : Deferred
230 A Deferred instance that will fire when the file exists.
239 A Deferred instance that will fire when the file exists.
231 """
240 """
232
241
233 d = defer.Deferred()
242 d = defer.Deferred()
234
243
235 def _test_for_file(filename, attempt=0):
244 def _test_for_file(filename, attempt=0):
236 if attempt >= max_tries:
245 if attempt >= max_tries:
237 d.errback(FileTimeoutError(
246 d.errback(FileTimeoutError(
238 'timeout waiting for file to be created: %s' % filename
247 'timeout waiting for file to be created: %s' % filename
239 ))
248 ))
240 else:
249 else:
241 if os.path.isfile(filename):
250 if os.path.isfile(filename):
242 d.callback(True)
251 d.callback(True)
243 else:
252 else:
244 reactor.callLater(delay, _test_for_file, filename, attempt+1)
253 reactor.callLater(delay, _test_for_file, filename, attempt+1)
245
254
246 _test_for_file(filename)
255 _test_for_file(filename)
247 return d
256 return d
248
257
249
258
250 def sleep_deferred(seconds):
259 def sleep_deferred(seconds):
251 """Sleep without blocking the event loop."""
260 """Sleep without blocking the event loop."""
252 d = defer.Deferred()
261 d = defer.Deferred()
253 reactor.callLater(seconds, d.callback, seconds)
262 reactor.callLater(seconds, d.callback, seconds)
254 return d
263 return d
255
264
256
265
257 def make_deferred(func):
266 def make_deferred(func):
258 """A decorator that calls a function with :func`maybeDeferred`."""
267 """A decorator that calls a function with :func`maybeDeferred`."""
259
268
260 def _wrapper(*args, **kwargs):
269 def _wrapper(*args, **kwargs):
261 return defer.maybeDeferred(func, *args, **kwargs)
270 return defer.maybeDeferred(func, *args, **kwargs)
262
271
263 return _wrapper No newline at end of file
272 return _wrapper
273
274
General Comments 0
You need to be logged in to leave comments. Login now