##// END OF EJS Templates
Work on engine/client reconnect logic.
Brian Granger -
Show More
@@ -1,655 +1,713 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Facilities for handling client connections to the controller."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008-2009 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 from __future__ import with_statement
18 18 import os
19 19
20 from IPython.kernel.fcutil import Tub, find_furl
20 from IPython.kernel.fcutil import (
21 Tub,
22 find_furl,
23 is_valid_furl_or_file,
24 validate_furl_or_file,
25 FURLError
26 )
21 27 from IPython.kernel.clusterdir import ClusterDir, ClusterDirError
22 28 from IPython.kernel.launcher import IPClusterLauncher
23 from IPython.kernel.twistedutil import gatherBoth, make_deferred
24 from IPython.kernel.twistedutil import blockingCallFromThread
25
29 from IPython.kernel.twistedutil import (
30 gatherBoth,
31 make_deferred,
32 blockingCallFromThread,
33 sleep_deferred
34 )
26 35 from IPython.utils.importstring import import_item
27 36 from IPython.utils.genutils import get_ipython_dir
28 37
29 38 from twisted.internet import defer
30 from twisted.python import failure
39 from twisted.internet.defer import inlineCallbacks, returnValue
40 from twisted.python import failure, log
31 41
32 42 #-----------------------------------------------------------------------------
33 43 # The ClientConnector class
34 44 #-----------------------------------------------------------------------------
35 45
46 DELAY = 0.2
47 MAX_TRIES = 9
48
49
50 class ClientConnectorError(Exception):
51 pass
52
36 53
37 54 class AsyncClientConnector(object):
38 55 """A class for getting remote references and clients from furls.
39 56
40 57 This start a single :class:`Tub` for all remote reference and caches
41 58 references.
42 59 """
43 60
44 61 def __init__(self):
45 62 self._remote_refs = {}
46 63 self.tub = Tub()
47 64 self.tub.startService()
48 65
49 66 def _find_furl(self, profile='default', cluster_dir=None,
50 67 furl_or_file=None, furl_file_name=None,
51 68 ipythondir=None):
52 69 """Find a FURL file by profile+ipythondir or cluster dir.
53 70
54 This raises an exception if a FURL file can't be found.
71 This raises an :exc:`~IPython.kernel.fcutil.FURLError` exception
72 if a FURL file can't be found.
55 73 """
56 74 # Try by furl_or_file
57 75 if furl_or_file is not None:
58 try:
59 furl = find_furl(furl_or_file)
60 except ValueError:
61 return furl
76 validate_furl_or_file(furl_or_file)
77 return furl_or_file
62 78
63 79 if furl_file_name is None:
64 raise ValueError('A furl_file_name must be provided')
80 raise FURLError('A furl_file_name must be provided')
65 81
66 82 # Try by cluster_dir
67 83 if cluster_dir is not None:
68 84 cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
69 85 sdir = cluster_dir_obj.security_dir
70 86 furl_file = os.path.join(sdir, furl_file_name)
71 return find_furl(furl_file)
87 validate_furl_or_file(furl_file)
88 return furl_file
72 89
73 90 # Try by profile
74 91 if ipythondir is None:
75 92 ipythondir = get_ipython_dir()
76 93 if profile is not None:
77 94 cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
78 95 ipythondir, profile)
79 96 sdir = cluster_dir_obj.security_dir
80 97 furl_file = os.path.join(sdir, furl_file_name)
81 return find_furl(furl_file)
98 validate_furl_or_file(furl_file)
99 return furl_file
82 100
83 raise ValueError('Could not find a valid FURL file.')
101 raise FURLError('Could not find a valid FURL file.')
84 102
85 103 def get_reference(self, furl_or_file):
86 104 """Get a remote reference using a furl or a file containing a furl.
87 105
88 106 Remote references are cached locally so once a remote reference
89 107 has been retrieved for a given furl, the cached version is
90 108 returned.
91 109
92 110 Parameters
93 111 ----------
94 112 furl_or_file : str
95 A furl or a filename containing a furl
113 A furl or a filename containing a furl. This should already be
114 validated, but might not yet exist.
96 115
97 116 Returns
98 117 -------
99 118 A deferred to a remote reference
100 119 """
101 furl = find_furl(furl_or_file)
120 furl = furl_or_file
102 121 if furl in self._remote_refs:
103 122 d = defer.succeed(self._remote_refs[furl])
104 123 else:
105 124 d = self.tub.getReference(furl)
106 125 d.addCallback(self._save_ref, furl)
107 126 return d
108 127
109 128 def _save_ref(self, ref, furl):
110 129 """Cache a remote reference by its furl."""
111 130 self._remote_refs[furl] = ref
112 131 return ref
113 132
114 133 def get_task_client(self, profile='default', cluster_dir=None,
115 furl_or_file=None, ipythondir=None):
134 furl_or_file=None, ipythondir=None,
135 delay=DELAY, max_tries=MAX_TRIES):
116 136 """Get the task controller client.
117 137
118 138 This method is a simple wrapper around `get_client` that passes in
119 139 the default name of the task client FURL file. Usually only
120 140 the ``profile`` option will be needed. If a FURL file can't be
121 141 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
122 142
123 143 Parameters
124 144 ----------
125 145 profile : str
126 146 The name of a cluster directory profile (default="default"). The
127 147 cluster directory "cluster_<profile>" will be searched for
128 148 in ``os.getcwd()``, the ipythondir and then in the directories
129 149 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
130 150 cluster_dir : str
131 151 The full path to a cluster directory. This is useful if profiles
132 152 are not being used.
133 153 furl_or_file : str
134 154 A furl or a filename containing a FURLK. This is useful if you
135 155 simply know the location of the FURL file.
136 156 ipythondir : str
137 157 The location of the ipythondir if different from the default.
138 158 This is used if the cluster directory is being found by profile.
139 159
140 160 Returns
141 161 -------
142 162 A deferred to the actual client class.
143 163 """
144 164 return self.get_client(
145 165 profile, cluster_dir, furl_or_file,
146 'ipcontroller-tc.furl', ipythondir
166 'ipcontroller-tc.furl', ipythondir,
167 delay, max_tries
147 168 )
148 169
149 170 def get_multiengine_client(self, profile='default', cluster_dir=None,
150 furl_or_file=None, ipythondir=None):
171 furl_or_file=None, ipythondir=None,
172 delay=DELAY, max_tries=MAX_TRIES):
151 173 """Get the multiengine controller client.
152 174
153 175 This method is a simple wrapper around `get_client` that passes in
154 176 the default name of the task client FURL file. Usually only
155 177 the ``profile`` option will be needed. If a FURL file can't be
156 178 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
157 179
158 180 Parameters
159 181 ----------
160 182 profile : str
161 183 The name of a cluster directory profile (default="default"). The
162 184 cluster directory "cluster_<profile>" will be searched for
163 185 in ``os.getcwd()``, the ipythondir and then in the directories
164 186 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
165 187 cluster_dir : str
166 188 The full path to a cluster directory. This is useful if profiles
167 189 are not being used.
168 190 furl_or_file : str
169 191 A furl or a filename containing a FURLK. This is useful if you
170 192 simply know the location of the FURL file.
171 193 ipythondir : str
172 194 The location of the ipythondir if different from the default.
173 195 This is used if the cluster directory is being found by profile.
174 196
175 197 Returns
176 198 -------
177 199 A deferred to the actual client class.
178 200 """
179 201 return self.get_client(
180 202 profile, cluster_dir, furl_or_file,
181 'ipcontroller-mec.furl', ipythondir
203 'ipcontroller-mec.furl', ipythondir,
204 delay, max_tries
182 205 )
183 206
184 207 def get_client(self, profile='default', cluster_dir=None,
185 furl_or_file=None, furl_file_name=None, ipythondir=None):
208 furl_or_file=None, furl_file_name=None, ipythondir=None,
209 delay=DELAY, max_tries=MAX_TRIES):
186 210 """Get a remote reference and wrap it in a client by furl.
187 211
188 212 This method is a simple wrapper around `get_client` that passes in
189 213 the default name of the task client FURL file. Usually only
190 214 the ``profile`` option will be needed. If a FURL file can't be
191 215 found by its profile, use ``cluster_dir`` or ``furl_or_file``.
192 216
193 217 Parameters
194 218 ----------
195 219 profile : str
196 220 The name of a cluster directory profile (default="default"). The
197 221 cluster directory "cluster_<profile>" will be searched for
198 222 in ``os.getcwd()``, the ipythondir and then in the directories
199 223 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
200 224 cluster_dir : str
201 225 The full path to a cluster directory. This is useful if profiles
202 226 are not being used.
203 227 furl_or_file : str
204 A furl or a filename containing a FURLK. This is useful if you
228 A furl or a filename containing a FURL. This is useful if you
205 229 simply know the location of the FURL file.
206 230 furl_file_name : str
207 231 The filename (not the full path) of the FURL. This must be
208 232 provided if ``furl_or_file`` is not.
209 233 ipythondir : str
210 234 The location of the ipythondir if different from the default.
211 235 This is used if the cluster directory is being found by profile.
212 236
213 237 Returns
214 238 -------
215 A deferred to the actual client class.
239 A deferred to the actual client class. Or a failure to a
240 :exc:`FURLError`.
216 241 """
217 242 try:
218 furl = self._find_furl(
243 furl_file = self._find_furl(
219 244 profile, cluster_dir, furl_or_file,
220 245 furl_file_name, ipythondir
221 246 )
222 except:
247 except FURLError:
223 248 return defer.fail(failure.Failure())
224 249
225 d = self.get_reference(furl)
226
227 250 def _wrap_remote_reference(rr):
228 251 d = rr.callRemote('get_client_name')
229 252 d.addCallback(lambda name: import_item(name))
230 253 def adapt(client_interface):
231 254 client = client_interface(rr)
232 255 client.tub = self.tub
233 256 return client
234 257 d.addCallback(adapt)
235 258
236 259 return d
237 260
261 d = self._try_to_connect(furl_file, delay, max_tries, attempt=0)
238 262 d.addCallback(_wrap_remote_reference)
239 263 return d
240 264
265 @inlineCallbacks
266 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
267 """Try to connect to the controller with retry logic."""
268 if attempt < max_tries:
269 log.msg("Connecting to controller [%r]: %s" % \
270 (attempt, furl_or_file))
271 try:
272 self.furl = find_furl(furl_or_file)
273 # Uncomment this to see the FURL being tried.
274 # log.msg("FURL: %s" % self.furl)
275 rr = yield self.get_reference(self.furl)
276 except:
277 if attempt==max_tries-1:
278 # This will propagate the exception all the way to the top
279 # where it can be handled.
280 raise
281 else:
282 yield sleep_deferred(delay)
283 rr = yield self._try_to_connect(
284 furl_or_file, 1.5*delay, max_tries, attempt+1
285 )
286 returnValue(rr)
287 else:
288 returnValue(rr)
289 else:
290 raise ClientConnectorError(
291 'Could not connect to controller, max_tries (%r) exceeded. '
292 'This usually means that i) the controller was not started, '
293 'or ii) a firewall was blocking the client from connecting '
294 'to the controller.' % max_tries
295 )
296
241 297
242 298 class ClientConnector(object):
243 299 """A blocking version of a client connector.
244 300
245 301 This class creates a single :class:`Tub` instance and allows remote
246 302 references and client to be retrieved by their FURLs. Remote references
247 303 are cached locally and FURL files can be found using profiles and cluster
248 304 directories.
249 305 """
250 306
251 307 def __init__(self):
252 308 self.async_cc = AsyncClientConnector()
253 309
254 310 def get_task_client(self, profile='default', cluster_dir=None,
255 furl_or_file=None, ipythondir=None):
311 furl_or_file=None, ipythondir=None,
312 delay=DELAY, max_tries=MAX_TRIES):
256 313 """Get the task client.
257 314
258 315 Usually only the ``profile`` option will be needed. If a FURL file
259 316 can't be found by its profile, use ``cluster_dir`` or
260 317 ``furl_or_file``.
261 318
262 319 Parameters
263 320 ----------
264 321 profile : str
265 322 The name of a cluster directory profile (default="default"). The
266 323 cluster directory "cluster_<profile>" will be searched for
267 324 in ``os.getcwd()``, the ipythondir and then in the directories
268 325 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
269 326 cluster_dir : str
270 327 The full path to a cluster directory. This is useful if profiles
271 328 are not being used.
272 329 furl_or_file : str
273 330 A furl or a filename containing a FURLK. This is useful if you
274 331 simply know the location of the FURL file.
275 332 ipythondir : str
276 333 The location of the ipythondir if different from the default.
277 334 This is used if the cluster directory is being found by profile.
278 335
279 336 Returns
280 337 -------
281 338 The task client instance.
282 339 """
283 340 client = blockingCallFromThread(
284 341 self.async_cc.get_task_client, profile, cluster_dir,
285 furl_or_file, ipythondir
342 furl_or_file, ipythondir, delay, max_tries
286 343 )
287 344 return client.adapt_to_blocking_client()
288 345
289 346 def get_multiengine_client(self, profile='default', cluster_dir=None,
290 furl_or_file=None, ipythondir=None):
347 furl_or_file=None, ipythondir=None,
348 delay=DELAY, max_tries=MAX_TRIES):
291 349 """Get the multiengine client.
292 350
293 351 Usually only the ``profile`` option will be needed. If a FURL file
294 352 can't be found by its profile, use ``cluster_dir`` or
295 353 ``furl_or_file``.
296 354
297 355 Parameters
298 356 ----------
299 357 profile : str
300 358 The name of a cluster directory profile (default="default"). The
301 359 cluster directory "cluster_<profile>" will be searched for
302 360 in ``os.getcwd()``, the ipythondir and then in the directories
303 361 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
304 362 cluster_dir : str
305 363 The full path to a cluster directory. This is useful if profiles
306 364 are not being used.
307 365 furl_or_file : str
308 366 A furl or a filename containing a FURLK. This is useful if you
309 367 simply know the location of the FURL file.
310 368 ipythondir : str
311 369 The location of the ipythondir if different from the default.
312 370 This is used if the cluster directory is being found by profile.
313 371
314 372 Returns
315 373 -------
316 374 The multiengine client instance.
317 375 """
318 376 client = blockingCallFromThread(
319 377 self.async_cc.get_multiengine_client, profile, cluster_dir,
320 furl_or_file, ipythondir
378 furl_or_file, ipythondir, delay, max_tries
321 379 )
322 380 return client.adapt_to_blocking_client()
323 381
324 382 def get_client(self, profile='default', cluster_dir=None,
325 furl_or_file=None, ipythondir=None):
383 furl_or_file=None, ipythondir=None,
384 delay=DELAY, max_tries=MAX_TRIES):
326 385 client = blockingCallFromThread(
327 386 self.async_cc.get_client, profile, cluster_dir,
328 furl_or_file, ipythondir
387 furl_or_file, ipythondir,
388 delay, max_tries
329 389 )
330 390 return client.adapt_to_blocking_client()
331 391
332 392
333 393 class ClusterStateError(Exception):
334 394 pass
335 395
336 396
337 397 class AsyncCluster(object):
338 398 """An class that wraps the :command:`ipcluster` script."""
339 399
340 400 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
341 401 auto_create=False, auto_stop=True):
342 402 """Create a class to manage an IPython cluster.
343 403
344 404 This class calls the :command:`ipcluster` command with the right
345 405 options to start an IPython cluster. Typically a cluster directory
346 406 must be created (:command:`ipcluster create`) and configured before
347 407 using this class. Configuration is done by editing the
348 408 configuration files in the top level of the cluster directory.
349 409
350 410 Parameters
351 411 ----------
352 412 profile : str
353 413 The name of a cluster directory profile (default="default"). The
354 414 cluster directory "cluster_<profile>" will be searched for
355 415 in ``os.getcwd()``, the ipythondir and then in the directories
356 416 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
357 417 cluster_dir : str
358 418 The full path to a cluster directory. This is useful if profiles
359 419 are not being used.
360 furl_or_file : str
361 A furl or a filename containing a FURLK. This is useful if you
362 simply know the location of the FURL file.
363 420 ipythondir : str
364 421 The location of the ipythondir if different from the default.
365 422 This is used if the cluster directory is being found by profile.
366 423 auto_create : bool
367 424 Automatically create the cluster directory it is dones't exist.
368 425 This will usually only make sense if using a local cluster
369 426 (default=False).
370 427 auto_stop : bool
371 428 Automatically stop the cluster when this instance is garbage
372 429 collected (default=True). This is useful if you want the cluster
373 430 to live beyond your current process. There is also an instance
374 431 attribute ``auto_stop`` to change this behavior.
375 432 """
376 433 self._setup_cluster_dir(profile, cluster_dir, ipythondir, auto_create)
377 434 self.state = 'before'
378 435 self.launcher = None
379 436 self.client_connector = None
380 437 self.auto_stop = auto_stop
381 438
382 439 def __del__(self):
383 440 if self.auto_stop and self.state=='running':
384 441 print "Auto stopping the cluster..."
385 442 self.stop()
386 443
387 444 @property
388 445 def location(self):
389 446 if hasattr(self, 'cluster_dir_obj'):
390 447 return self.cluster_dir_obj.location
391 448 else:
392 449 return ''
393 450
394 451 @property
395 452 def running(self):
396 453 if self.state=='running':
397 454 return True
398 455 else:
399 456 return False
400 457
401 458 def _setup_cluster_dir(self, profile, cluster_dir, ipythondir, auto_create):
402 459 if ipythondir is None:
403 460 ipythondir = get_ipython_dir()
404 461 if cluster_dir is not None:
405 462 try:
406 463 self.cluster_dir_obj = ClusterDir.find_cluster_dir(cluster_dir)
407 464 except ClusterDirError:
408 465 pass
409 466 if profile is not None:
410 467 try:
411 468 self.cluster_dir_obj = ClusterDir.find_cluster_dir_by_profile(
412 469 ipythondir, profile)
413 470 except ClusterDirError:
414 471 pass
415 472 if auto_create or profile=='default':
416 473 # This should call 'ipcluster create --profile default
417 474 self.cluster_dir_obj = ClusterDir.create_cluster_dir_by_profile(
418 475 ipythondir, profile)
419 476 else:
420 477 raise ClusterDirError('Cluster dir not found.')
421 478
422 479 @make_deferred
423 480 def start(self, n=2):
424 481 """Start the IPython cluster with n engines.
425 482
426 483 Parameters
427 484 ----------
428 485 n : int
429 486 The number of engine to start.
430 487 """
431 488 # We might want to add logic to test if the cluster has started
432 489 # by another process....
433 490 if not self.state=='running':
434 491 self.launcher = IPClusterLauncher(os.getcwd())
435 492 self.launcher.ipcluster_n = n
436 493 self.launcher.ipcluster_subcommand = 'start'
437 494 d = self.launcher.start()
438 495 d.addCallback(self._handle_start)
439 496 return d
440 497 else:
441 498 raise ClusterStateError('Cluster is already running')
442 499
443 500 @make_deferred
444 501 def stop(self):
445 502 """Stop the IPython cluster if it is running."""
446 503 if self.state=='running':
447 504 d1 = self.launcher.observe_stop()
448 505 d1.addCallback(self._handle_stop)
449 506 d2 = self.launcher.stop()
450 507 return gatherBoth([d1, d2], consumeErrors=True)
451 508 else:
452 509 raise ClusterStateError("Cluster not running")
453 510
454 def get_multiengine_client(self):
511 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
455 512 """Get the multiengine client for the running cluster.
456 513
457 514 If this fails, it means that the cluster has not finished starting.
458 515 Usually waiting a few seconds are re-trying will solve this.
459 516 """
460 517 if self.client_connector is None:
461 518 self.client_connector = AsyncClientConnector()
462 519 return self.client_connector.get_multiengine_client(
463 cluster_dir=self.cluster_dir_obj.location
520 cluster_dir=self.cluster_dir_obj.location,
521 delay=delay, max_tries=max_tries
464 522 )
465 523
466 def get_task_client(self):
524 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
467 525 """Get the task client for the running cluster.
468 526
469 527 If this fails, it means that the cluster has not finished starting.
470 528 Usually waiting a few seconds are re-trying will solve this.
471 529 """
472 530 if self.client_connector is None:
473 531 self.client_connector = AsyncClientConnector()
474 532 return self.client_connector.get_task_client(
475 cluster_dir=self.cluster_dir_obj.location
533 cluster_dir=self.cluster_dir_obj.location,
534 delay=delay, max_tries=max_tries
476 535 )
477 536
478 537 def get_ipengine_logs(self):
479 538 return self.get_logs_by_name('ipengine')
480 539
481 540 def get_ipcontroller_logs(self):
482 541 return self.get_logs_by_name('ipcontroller')
483 542
484 543 def get_ipcluster_logs(self):
485 544 return self.get_logs_by_name('ipcluster')
486 545
487 546 def get_logs_by_name(self, name='ipcluster'):
488 547 log_dir = self.cluster_dir_obj.log_dir
489 548 logs = {}
490 549 for log in os.listdir(log_dir):
491 550 if log.startswith(name + '-') and log.endswith('.log'):
492 551 with open(os.path.join(log_dir, log), 'r') as f:
493 552 logs[log] = f.read()
494 553 return logs
495 554
496 555 def get_logs(self):
497 556 d = self.get_ipcluster_logs()
498 557 d.update(self.get_ipengine_logs())
499 558 d.update(self.get_ipcontroller_logs())
500 559 return d
501 560
502 561 def _handle_start(self, r):
503 562 self.state = 'running'
504 563
505 564 def _handle_stop(self, r):
506 565 self.state = 'after'
507 566
508 567
509 568 class Cluster(object):
510 569
511 570
512 571 def __init__(self, profile='default', cluster_dir=None, ipythondir=None,
513 572 auto_create=False, auto_stop=True):
514 573 """Create a class to manage an IPython cluster.
515 574
516 575 This class calls the :command:`ipcluster` command with the right
517 576 options to start an IPython cluster. Typically a cluster directory
518 577 must be created (:command:`ipcluster create`) and configured before
519 578 using this class. Configuration is done by editing the
520 579 configuration files in the top level of the cluster directory.
521 580
522 581 Parameters
523 582 ----------
524 583 profile : str
525 584 The name of a cluster directory profile (default="default"). The
526 585 cluster directory "cluster_<profile>" will be searched for
527 586 in ``os.getcwd()``, the ipythondir and then in the directories
528 587 listed in the :env:`IPCLUSTERDIR_PATH` environment variable.
529 588 cluster_dir : str
530 589 The full path to a cluster directory. This is useful if profiles
531 590 are not being used.
532 furl_or_file : str
533 A furl or a filename containing a FURLK. This is useful if you
534 simply know the location of the FURL file.
535 591 ipythondir : str
536 592 The location of the ipythondir if different from the default.
537 593 This is used if the cluster directory is being found by profile.
538 594 auto_create : bool
539 595 Automatically create the cluster directory it is dones't exist.
540 596 This will usually only make sense if using a local cluster
541 597 (default=False).
542 598 auto_stop : bool
543 599 Automatically stop the cluster when this instance is garbage
544 600 collected (default=True). This is useful if you want the cluster
545 601 to live beyond your current process. There is also an instance
546 602 attribute ``auto_stop`` to change this behavior.
547 603 """
548 604 self.async_cluster = AsyncCluster(
549 605 profile, cluster_dir, ipythondir, auto_create, auto_stop
550 606 )
551 607 self.cluster_dir_obj = self.async_cluster.cluster_dir_obj
552 608 self.client_connector = None
553 609
554 610 def _set_auto_stop(self, value):
555 611 self.async_cluster.auto_stop = value
556 612
557 613 def _get_auto_stop(self):
558 614 return self.async_cluster.auto_stop
559 615
560 616 auto_stop = property(_get_auto_stop, _set_auto_stop)
561 617
562 618 @property
563 619 def location(self):
564 620 return self.async_cluster.location
565 621
566 622 @property
567 623 def running(self):
568 624 return self.async_cluster.running
569 625
570 626 def start(self, n=2):
571 627 """Start the IPython cluster with n engines.
572 628
573 629 Parameters
574 630 ----------
575 631 n : int
576 632 The number of engine to start.
577 633 """
578 634 return blockingCallFromThread(self.async_cluster.start, n)
579 635
580 636 def stop(self):
581 637 """Stop the IPython cluster if it is running."""
582 638 return blockingCallFromThread(self.async_cluster.stop)
583 639
584 def get_multiengine_client(self):
640 def get_multiengine_client(self, delay=DELAY, max_tries=MAX_TRIES):
585 641 """Get the multiengine client for the running cluster.
586 642
587 643 If this fails, it means that the cluster has not finished starting.
588 644 Usually waiting a few seconds are re-trying will solve this.
589 645 """
590 646 if self.client_connector is None:
591 647 self.client_connector = ClientConnector()
592 648 return self.client_connector.get_multiengine_client(
593 cluster_dir=self.cluster_dir_obj.location
649 cluster_dir=self.cluster_dir_obj.location,
650 delay=delay, max_tries=max_tries
594 651 )
595 652
596 def get_task_client(self):
653 def get_task_client(self, delay=DELAY, max_tries=MAX_TRIES):
597 654 """Get the task client for the running cluster.
598 655
599 656 If this fails, it means that the cluster has not finished starting.
600 657 Usually waiting a few seconds are re-trying will solve this.
601 658 """
602 659 if self.client_connector is None:
603 660 self.client_connector = ClientConnector()
604 661 return self.client_connector.get_task_client(
605 cluster_dir=self.cluster_dir_obj.location
662 cluster_dir=self.cluster_dir_obj.location,
663 delay=delay, max_tries=max_tries
606 664 )
607 665
608 666 def __repr__(self):
609 667 s = "<Cluster(running=%r, location=%s)" % (self.running, self.location)
610 668 return s
611 669
612 670 def get_logs_by_name(self, name='ipcluter'):
613 671 """Get a dict of logs by process name (ipcluster, ipengine, etc.)"""
614 672 return self.async_cluster.get_logs_by_name(name)
615 673
616 674 def get_ipengine_logs(self):
617 675 """Get a dict of logs for all engines in this cluster."""
618 676 return self.async_cluster.get_ipengine_logs()
619 677
620 678 def get_ipcontroller_logs(self):
621 679 """Get a dict of logs for the controller in this cluster."""
622 680 return self.async_cluster.get_ipcontroller_logs()
623 681
624 682 def get_ipcluster_logs(self):
625 683 """Get a dict of the ipcluster logs for this cluster."""
626 684 return self.async_cluster.get_ipcluster_logs()
627 685
628 686 def get_logs(self):
629 687 """Get a dict of all logs for this cluster."""
630 688 return self.async_cluster.get_logs()
631 689
632 690 def _print_logs(self, logs):
633 691 for k, v in logs.iteritems():
634 692 print "==================================="
635 693 print "Logfile: %s" % k
636 694 print "==================================="
637 695 print v
638 696 print
639 697
640 698 def print_ipengine_logs(self):
641 699 """Print the ipengine logs for this cluster to stdout."""
642 700 self._print_logs(self.get_ipengine_logs())
643 701
644 702 def print_ipcontroller_logs(self):
645 703 """Print the ipcontroller logs for this cluster to stdout."""
646 704 self._print_logs(self.get_ipcontroller_logs())
647 705
648 706 def print_ipcluster_logs(self):
649 707 """Print the ipcluster logs for this cluster to stdout."""
650 708 self._print_logs(self.get_ipcluster_logs())
651 709
652 710 def print_logs(self):
653 711 """Print all the logs for this cluster to stdout."""
654 712 self._print_logs(self.get_logs())
655 713
@@ -1,130 +1,139 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """A class that manages the engines connection to the controller."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008-2009 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os
18 18 import cPickle as pickle
19 19
20 20 from twisted.python import log, failure
21 21 from twisted.internet import defer
22 22 from twisted.internet.defer import inlineCallbacks, returnValue
23 23
24 from IPython.kernel.fcutil import find_furl
24 from IPython.kernel.fcutil import find_furl, validate_furl_or_file
25 25 from IPython.kernel.enginefc import IFCEngine
26 from IPython.kernel.twistedutil import sleep_deferred
26 from IPython.kernel.twistedutil import sleep_deferred, make_deferred
27 27
28 28 #-----------------------------------------------------------------------------
29 29 # The ClientConnector class
30 30 #-----------------------------------------------------------------------------
31 31
32 32
33 33 class EngineConnectorError(Exception):
34 34 pass
35 35
36 36
37 37 class EngineConnector(object):
38 38 """Manage an engines connection to a controller.
39 39
40 40 This class takes a foolscap `Tub` and provides a `connect_to_controller`
41 41 method that will use the `Tub` to connect to a controller and register
42 42 the engine with the controller.
43 43 """
44 44
45 45 def __init__(self, tub):
46 46 self.tub = tub
47 47
48 @make_deferred
48 49 def connect_to_controller(self, engine_service, furl_or_file,
49 50 delay=0.1, max_tries=10):
50 51 """
51 52 Make a connection to a controller specified by a furl.
52 53
53 54 This method takes an `IEngineBase` instance and a foolcap URL and uses
54 55 the `tub` attribute to make a connection to the controller. The
55 56 foolscap URL contains all the information needed to connect to the
56 57 controller, including the ip and port as well as any encryption and
57 58 authentication information needed for the connection.
58 59
59 60 After getting a reference to the controller, this method calls the
60 61 `register_engine` method of the controller to actually register the
61 62 engine.
62 63
63 64 This method will try to connect to the controller multiple times with
64 65 a delay in between. Each time the FURL file is read anew.
65 66
66 67 Parameters
67 68 __________
68 69 engine_service : IEngineBase
69 70 An instance of an `IEngineBase` implementer
70 71 furl_or_file : str
71 72 A furl or a filename containing a furl
72 73 delay : float
73 74 The intial time to wait between connection attempts. Subsequent
74 75 attempts have increasing delays.
75 76 max_tries : int
76 77 The maximum number of connection attempts.
78
79 Returns
80 -------
81 A deferred to the registered client or a failure to an error
82 like :exc:`FURLError`.
77 83 """
78 84 if not self.tub.running:
79 85 self.tub.startService()
80 86 self.engine_service = engine_service
81 87 self.engine_reference = IFCEngine(self.engine_service)
82 88
89 validate_furl_or_file(furl_or_file)
83 90 d = self._try_to_connect(furl_or_file, delay, max_tries, attempt=0)
91 d.addCallback(self._register)
84 92 return d
85 93
86 94 @inlineCallbacks
87 95 def _try_to_connect(self, furl_or_file, delay, max_tries, attempt):
88 96 """Try to connect to the controller with retry logic."""
89 97 if attempt < max_tries:
90 98 log.msg("Attempting to connect to controller [%r]: %s" % \
91 99 (attempt, furl_or_file))
92 100 try:
93 101 self.furl = find_furl(furl_or_file)
94 102 # Uncomment this to see the FURL being tried.
95 103 # log.msg("FURL: %s" % self.furl)
96 104 rr = yield self.tub.getReference(self.furl)
97 105 except:
98 106 if attempt==max_tries-1:
99 107 # This will propagate the exception all the way to the top
100 108 # where it can be handled.
101 109 raise
102 110 else:
103 111 yield sleep_deferred(delay)
104 yield self._try_to_connect(
112 rr = yield self._try_to_connect(
105 113 furl_or_file, 1.5*delay, max_tries, attempt+1
106 114 )
115 # rr becomes an int when there is a connection!!!
116 returnValue(rr)
107 117 else:
108 result = yield self._register(rr)
109 returnValue(result)
118 returnValue(rr)
110 119 else:
111 120 raise EngineConnectorError(
112 121 'Could not connect to controller, max_tries (%r) exceeded. '
113 122 'This usually means that i) the controller was not started, '
114 123 'or ii) a firewall was blocking the engine from connecting '
115 124 'to the controller.' % max_tries
116 125 )
117 126
118 127 def _register(self, rr):
119 128 self.remote_ref = rr
120 129 # Now register myself with the controller
121 130 desired_id = self.engine_service.id
122 131 d = self.remote_ref.callRemote('register_engine', self.engine_reference,
123 132 desired_id, os.getpid(), pickle.dumps(self.engine_service.properties,2))
124 133 return d.addCallback(self._reference_sent)
125 134
126 135 def _reference_sent(self, registration_dict):
127 136 self.engine_service.id = registration_dict['id']
128 137 log.msg("engine registration succeeded, got id: %r" % self.engine_service.id)
129 138 return self.engine_service.id
130 139
@@ -1,239 +1,266 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3 """
4 4 Foolscap related utilities.
5 5 """
6 6
7 7 #-----------------------------------------------------------------------------
8 8 # Copyright (C) 2008-2009 The IPython Development Team
9 9 #
10 10 # Distributed under the terms of the BSD License. The full license is in
11 11 # the file COPYING, distributed as part of this software.
12 12 #-----------------------------------------------------------------------------
13 13
14 14 #-----------------------------------------------------------------------------
15 15 # Imports
16 16 #-----------------------------------------------------------------------------
17 17
18 18 from __future__ import with_statement
19 19
20 20 import os
21 21 import tempfile
22 22
23 23 from twisted.internet import reactor, defer
24 24 from twisted.python import log
25 25
26 26 from foolscap import Tub, UnauthenticatedTub
27 27
28 28 from IPython.config.loader import Config
29 29
30 30 from IPython.kernel.configobjfactory import AdaptedConfiguredObjectFactory
31 31
32 32 from IPython.kernel.error import SecurityError
33 33
34 34 from IPython.utils.traitlets import Int, Str, Bool, Instance
35 35 from IPython.utils.importstring import import_item
36 36
37 37 #-----------------------------------------------------------------------------
38 38 # Code
39 39 #-----------------------------------------------------------------------------
40 40
41 41
42 42 # We do this so if a user doesn't have OpenSSL installed, it will try to use
43 43 # an UnauthenticatedTub. But, they will still run into problems if they
44 44 # try to use encrypted furls.
45 45 try:
46 46 import OpenSSL
47 47 except:
48 48 Tub = UnauthenticatedTub
49 49 have_crypto = False
50 50 else:
51 51 have_crypto = True
52 52
53 53
54 class FURLError(Exception):
55 pass
56
57
54 58 def check_furl_file_security(furl_file, secure):
55 59 """Remove the old furl_file if changing security modes."""
56 60 if os.path.isfile(furl_file):
57 61 f = open(furl_file, 'r')
58 62 oldfurl = f.read().strip()
59 63 f.close()
60 64 if (oldfurl.startswith('pb://') and not secure) or (oldfurl.startswith('pbu://') and secure):
61 65 os.remove(furl_file)
62 66
63 67
64 68 def is_secure(furl):
65 69 """Is the given FURL secure or not."""
66 70 if is_valid(furl):
67 71 if furl.startswith("pb://"):
68 72 return True
69 73 elif furl.startswith("pbu://"):
70 74 return False
71 75 else:
72 raise ValueError("invalid FURL: %s" % furl)
76 raise FURLError("invalid FURL: %s" % furl)
73 77
74 78
75 79 def is_valid(furl):
76 80 """Is the str a valid FURL or not."""
77 81 if isinstance(furl, str):
78 82 if furl.startswith("pb://") or furl.startswith("pbu://"):
79 83 return True
80 84 else:
81 85 return False
82 86
83 87
84 88 def find_furl(furl_or_file):
85 89 """Find, validate and return a FURL in a string or file."""
86 90 if isinstance(furl_or_file, str):
87 91 if is_valid(furl_or_file):
88 92 return furl_or_file
89 93 if os.path.isfile(furl_or_file):
90 94 with open(furl_or_file, 'r') as f:
91 95 furl = f.read().strip()
92 96 if is_valid(furl):
93 97 return furl
94 raise ValueError("Not a FURL or a file containing a FURL: %s" % furl_or_file)
98 raise FURLError("Not a valid FURL or FURL file: %s" % furl_or_file)
99
100
101 def is_valid_furl_or_file(furl_or_file):
102 """Validate a FURL or a FURL file.
103
104 If ``furl_or_file`` looks like a file, we simply make sure its directory
105 exists and that it has a ``.furl`` file extension. We don't try to see
106 if the FURL file exists or to read its contents. This is useful for
107 cases where auto re-connection is being used.
108 """
109 if isinstance(furl_or_file, str):
110 if is_valid(furl_or_file):
111 return True
112 if isinstance(furl_or_file, (str, unicode)):
113 path, furl_filename = os.path.split(furl_or_file)
114 if os.path.isdir(path) and furl_filename.endswith('.furl'):
115 return True
116 return False
117
118
119 def validate_furl_or_file(furl_or_file):
120 if not is_valid_furl_or_file(furl_or_file):
121 raise FURLError('Not a valid FURL or FURL file: %r' % furl_or_file)
95 122
96 123
97 124 def get_temp_furlfile(filename):
98 125 """Return a temporary FURL file."""
99 126 return tempfile.mktemp(dir=os.path.dirname(filename),
100 127 prefix=os.path.basename(filename))
101 128
102 129
103 130 def make_tub(ip, port, secure, cert_file):
104 131 """Create a listening tub given an ip, port, and cert_file location.
105 132
106 133 Parameters
107 134 ----------
108 135 ip : str
109 136 The ip address or hostname that the tub should listen on.
110 137 Empty means all interfaces.
111 138 port : int
112 139 The port that the tub should listen on. A value of 0 means
113 140 pick a random port
114 141 secure: bool
115 142 Will the connection be secure (in the Foolscap sense).
116 143 cert_file: str
117 144 A filename of a file to be used for theSSL certificate.
118 145
119 146 Returns
120 147 -------
121 148 A tub, listener tuple.
122 149 """
123 150 if secure:
124 151 if have_crypto:
125 152 tub = Tub(certFile=cert_file)
126 153 else:
127 154 raise SecurityError("OpenSSL/pyOpenSSL is not available, so we "
128 155 "can't run in secure mode. Try running without "
129 156 "security using 'ipcontroller -xy'.")
130 157 else:
131 158 tub = UnauthenticatedTub()
132 159
133 160 # Set the strport based on the ip and port and start listening
134 161 if ip == '':
135 162 strport = "tcp:%i" % port
136 163 else:
137 164 strport = "tcp:%i:interface=%s" % (port, ip)
138 165 log.msg("Starting listener with [secure=%r] on: %s" % (secure, strport))
139 166 listener = tub.listenOn(strport)
140 167
141 168 return tub, listener
142 169
143 170
144 171 class FCServiceFactory(AdaptedConfiguredObjectFactory):
145 172 """This class creates a tub with various services running in it.
146 173
147 174 The basic idea is that :meth:`create` returns a running :class:`Tub`
148 175 instance that has a number of Foolscap references registered in it.
149 176 This class is a subclass of :class:`IPython.core.component.Component`
150 177 so the IPython configuration and component system are used.
151 178
152 179 Attributes
153 180 ----------
154 181 interfaces : Config
155 182 A Config instance whose values are sub-Config objects having two
156 183 keys: furl_file and interface_chain.
157 184
158 185 The other attributes are the standard ones for Foolscap.
159 186 """
160 187
161 188 ip = Str('', config=True)
162 189 port = Int(0, config=True)
163 190 secure = Bool(True, config=True)
164 191 cert_file = Str('', config=True)
165 192 location = Str('', config=True)
166 193 reuse_furls = Bool(False, config=True)
167 194 interfaces = Instance(klass=Config, kw={}, allow_none=False, config=True)
168 195
169 196 def __init__(self, config, adaptee):
170 197 super(FCServiceFactory, self).__init__(config, adaptee)
171 198 self._check_reuse_furls()
172 199
173 200 def _ip_changed(self, name, old, new):
174 201 if new == 'localhost' or new == '127.0.0.1':
175 202 self.location = '127.0.0.1'
176 203
177 204 def _check_reuse_furls(self):
178 205 furl_files = [i.furl_file for i in self.interfaces.values()]
179 206 for ff in furl_files:
180 207 fullfile = self._get_security_file(ff)
181 208 if self.reuse_furls:
182 209 log.msg("Reusing FURL file: %s" % fullfile)
183 210 else:
184 211 if os.path.isfile(fullfile):
185 212 log.msg("Removing old FURL file: %s" % fullfile)
186 213 os.remove(fullfile)
187 214
188 215 def _get_security_file(self, filename):
189 216 return os.path.join(self.config.Global.security_dir, filename)
190 217
191 218 def create(self):
192 219 """Create and return the Foolscap tub with everything running."""
193 220
194 221 self.tub, self.listener = make_tub(
195 222 self.ip, self.port, self.secure,
196 223 self._get_security_file(self.cert_file)
197 224 )
198 225 # log.msg("Interfaces to register [%r]: %r" % \
199 226 # (self.__class__, self.interfaces))
200 227 if not self.secure:
201 228 log.msg("WARNING: running with no security: %s" % \
202 229 self.__class__.__name__)
203 230 reactor.callWhenRunning(self.set_location_and_register)
204 231 return self.tub
205 232
206 233 def set_location_and_register(self):
207 234 """Set the location for the tub and return a deferred."""
208 235
209 236 if self.location == '':
210 237 d = self.tub.setLocationAutomatically()
211 238 else:
212 239 d = defer.maybeDeferred(self.tub.setLocation,
213 240 "%s:%i" % (self.location, self.listener.getPortnum()))
214 241 self.adapt_to_interfaces(d)
215 242
216 243 def adapt_to_interfaces(self, d):
217 244 """Run through the interfaces, adapt and register."""
218 245
219 246 for ifname, ifconfig in self.interfaces.iteritems():
220 247 ff = self._get_security_file(ifconfig.furl_file)
221 248 log.msg("Adapting [%s] to interface: %s" % \
222 249 (self.adaptee.__class__.__name__, ifname))
223 250 log.msg("Saving FURL for interface [%s] to file: %s" % (ifname, ff))
224 251 check_furl_file_security(ff, self.secure)
225 252 adaptee = self.adaptee
226 253 for i in ifconfig.interface_chain:
227 254 adaptee = import_item(i)(adaptee)
228 255 d.addCallback(self.register, adaptee, furl_file=ff)
229 256
230 257 def register(self, empty, ref, furl_file):
231 258 """Register the reference with the FURL file.
232 259
233 260 The FURL file is created and then moved to make sure that when the
234 261 file appears, the buffer has been flushed and the file closed.
235 262 """
236 263 temp_furl_file = get_temp_furlfile(furl_file)
237 264 self.tub.registerReference(ref, furlFile=temp_furl_file)
238 265 os.rename(temp_furl_file, furl_file)
239 266
@@ -1,263 +1,274 b''
1 1 #!/usr/bin/env python
2 2 # encoding: utf-8
3 3
4 4 """Things directly related to all of twisted."""
5 5
6 6 #-----------------------------------------------------------------------------
7 7 # Copyright (C) 2008-2009 The IPython Development Team
8 8 #
9 9 # Distributed under the terms of the BSD License. The full license is in
10 10 # the file COPYING, distributed as part of this software.
11 11 #-----------------------------------------------------------------------------
12 12
13 13 #-----------------------------------------------------------------------------
14 14 # Imports
15 15 #-----------------------------------------------------------------------------
16 16
17 17 import os, sys
18 18 import threading, Queue, atexit
19 19
20 20 import twisted
21 21 from twisted.internet import defer, reactor
22 22 from twisted.python import log, failure
23 23
24 24 from IPython.kernel.error import FileTimeoutError
25 25
26 26 #-----------------------------------------------------------------------------
27 27 # Classes related to twisted and threads
28 28 #-----------------------------------------------------------------------------
29 29
30 30
31 31 class ReactorInThread(threading.Thread):
32 32 """Run the twisted reactor in a different thread.
33 33
34 34 For the process to be able to exit cleanly, do the following:
35 35
36 36 rit = ReactorInThread()
37 37 rit.setDaemon(True)
38 38 rit.start()
39 39
40 40 """
41 41
42 42 def run(self):
43 """Run the twisted reactor in a thread.
44
45 This runs the reactor with installSignalHandlers=0, which prevents
46 twisted from installing any of its own signal handlers. This needs to
47 be disabled because signal.signal can't be called in a thread. The
48 only problem with this is that SIGCHLD events won't be detected so
49 spawnProcess won't detect that its processes have been killed by
50 an external factor.
51 """
43 52 reactor.run(installSignalHandlers=0)
44 53 # self.join()
45 54
46 55 def stop(self):
47 56 # I don't think this does anything useful.
48 57 blockingCallFromThread(reactor.stop)
49 58 self.join()
50 59
51 60 if(twisted.version.major >= 8):
52 61 import twisted.internet.threads
53 62 def blockingCallFromThread(f, *a, **kw):
54 63 """
55 64 Run a function in the reactor from a thread, and wait for the result
56 65 synchronously, i.e. until the callback chain returned by the function get a
57 66 result.
58 67
59 68 Delegates to twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw),
60 69 passing twisted.internet.reactor for the first argument.
61 70
62 71 @param f: the callable to run in the reactor thread
63 72 @type f: any callable.
64 73 @param a: the arguments to pass to C{f}.
65 74 @param kw: the keyword arguments to pass to C{f}.
66 75
67 76 @return: the result of the callback chain.
68 77 @raise: any error raised during the callback chain.
69 78 """
70 79 return twisted.internet.threads.blockingCallFromThread(reactor, f, *a, **kw)
71 80
72 81 else:
73 82 def blockingCallFromThread(f, *a, **kw):
74 83 """
75 84 Run a function in the reactor from a thread, and wait for the result
76 85 synchronously, i.e. until the callback chain returned by the function get a
77 86 result.
78 87
79 88 @param f: the callable to run in the reactor thread
80 89 @type f: any callable.
81 90 @param a: the arguments to pass to C{f}.
82 91 @param kw: the keyword arguments to pass to C{f}.
83 92
84 93 @return: the result of the callback chain.
85 94 @raise: any error raised during the callback chain.
86 95 """
87 96 from twisted.internet import reactor
88 97 queue = Queue.Queue()
89 98 def _callFromThread():
90 99 result = defer.maybeDeferred(f, *a, **kw)
91 100 result.addBoth(queue.put)
92 101
93 102 reactor.callFromThread(_callFromThread)
94 103 result = queue.get()
95 104 if isinstance(result, failure.Failure):
96 105 # This makes it easier for the debugger to get access to the instance
97 106 try:
98 107 result.raiseException()
99 108 except Exception, e:
100 109 raise e
101 110 return result
102 111
103 112
104 113
105 114 #-------------------------------------------------------------------------------
106 115 # Things for managing deferreds
107 116 #-------------------------------------------------------------------------------
108 117
109 118
110 119 def parseResults(results):
111 120 """Pull out results/Failures from a DeferredList."""
112 121 return [x[1] for x in results]
113 122
114 123 def gatherBoth(dlist, fireOnOneCallback=0,
115 124 fireOnOneErrback=0,
116 125 consumeErrors=0,
117 126 logErrors=0):
118 127 """This is like gatherBoth, but sets consumeErrors=1."""
119 128 d = DeferredList(dlist, fireOnOneCallback, fireOnOneErrback,
120 129 consumeErrors, logErrors)
121 130 if not fireOnOneCallback:
122 131 d.addCallback(parseResults)
123 132 return d
124 133
125 134 SUCCESS = True
126 135 FAILURE = False
127 136
128 137 class DeferredList(defer.Deferred):
129 138 """I combine a group of deferreds into one callback.
130 139
131 140 I track a list of L{Deferred}s for their callbacks, and make a single
132 141 callback when they have all completed, a list of (success, result)
133 142 tuples, 'success' being a boolean.
134 143
135 144 Note that you can still use a L{Deferred} after putting it in a
136 145 DeferredList. For example, you can suppress 'Unhandled error in Deferred'
137 146 messages by adding errbacks to the Deferreds *after* putting them in the
138 147 DeferredList, as a DeferredList won't swallow the errors. (Although a more
139 148 convenient way to do this is simply to set the consumeErrors flag)
140 149
141 150 Note: This is a modified version of the twisted.internet.defer.DeferredList
142 151 """
143 152
144 153 fireOnOneCallback = 0
145 154 fireOnOneErrback = 0
146 155
147 156 def __init__(self, deferredList, fireOnOneCallback=0, fireOnOneErrback=0,
148 157 consumeErrors=0, logErrors=0):
149 158 """Initialize a DeferredList.
150 159
151 160 @type deferredList: C{list} of L{Deferred}s
152 161 @param deferredList: The list of deferreds to track.
153 162 @param fireOnOneCallback: (keyword param) a flag indicating that
154 163 only one callback needs to be fired for me to call
155 164 my callback
156 165 @param fireOnOneErrback: (keyword param) a flag indicating that
157 166 only one errback needs to be fired for me to call
158 167 my errback
159 168 @param consumeErrors: (keyword param) a flag indicating that any errors
160 169 raised in the original deferreds should be
161 170 consumed by this DeferredList. This is useful to
162 171 prevent spurious warnings being logged.
163 172 """
164 173 self.resultList = [None] * len(deferredList)
165 174 defer.Deferred.__init__(self)
166 175 if len(deferredList) == 0 and not fireOnOneCallback:
167 176 self.callback(self.resultList)
168 177
169 178 # These flags need to be set *before* attaching callbacks to the
170 179 # deferreds, because the callbacks use these flags, and will run
171 180 # synchronously if any of the deferreds are already fired.
172 181 self.fireOnOneCallback = fireOnOneCallback
173 182 self.fireOnOneErrback = fireOnOneErrback
174 183 self.consumeErrors = consumeErrors
175 184 self.logErrors = logErrors
176 185 self.finishedCount = 0
177 186
178 187 index = 0
179 188 for deferred in deferredList:
180 189 deferred.addCallbacks(self._cbDeferred, self._cbDeferred,
181 190 callbackArgs=(index,SUCCESS),
182 191 errbackArgs=(index,FAILURE))
183 192 index = index + 1
184 193
185 194 def _cbDeferred(self, result, index, succeeded):
186 195 """(internal) Callback for when one of my deferreds fires.
187 196 """
188 197 self.resultList[index] = (succeeded, result)
189 198
190 199 self.finishedCount += 1
191 200 if not self.called:
192 201 if succeeded == SUCCESS and self.fireOnOneCallback:
193 202 self.callback((result, index))
194 203 elif succeeded == FAILURE and self.fireOnOneErrback:
195 204 # We have modified this to fire the errback chain with the actual
196 205 # Failure instance the originally occured rather than twisted's
197 206 # FirstError which wraps the failure
198 207 self.errback(result)
199 208 elif self.finishedCount == len(self.resultList):
200 209 self.callback(self.resultList)
201 210
202 211 if succeeded == FAILURE and self.logErrors:
203 212 log.err(result)
204 213 if succeeded == FAILURE and self.consumeErrors:
205 214 result = None
206 215
207 216 return result
208 217
209 218
210 219 def wait_for_file(filename, delay=0.1, max_tries=10):
211 220 """Wait (poll) for a file to be created.
212 221
213 222 This method returns a Deferred that will fire when a file exists. It
214 223 works by polling os.path.isfile in time intervals specified by the
215 224 delay argument. If `max_tries` is reached, it will errback with a
216 225 `FileTimeoutError`.
217 226
218 227 Parameters
219 228 ----------
220 229 filename : str
221 230 The name of the file to wait for.
222 231 delay : float
223 232 The time to wait between polls.
224 233 max_tries : int
225 234 The max number of attempts before raising `FileTimeoutError`
226 235
227 236 Returns
228 237 -------
229 238 d : Deferred
230 239 A Deferred instance that will fire when the file exists.
231 240 """
232 241
233 242 d = defer.Deferred()
234 243
235 244 def _test_for_file(filename, attempt=0):
236 245 if attempt >= max_tries:
237 246 d.errback(FileTimeoutError(
238 247 'timeout waiting for file to be created: %s' % filename
239 248 ))
240 249 else:
241 250 if os.path.isfile(filename):
242 251 d.callback(True)
243 252 else:
244 253 reactor.callLater(delay, _test_for_file, filename, attempt+1)
245 254
246 255 _test_for_file(filename)
247 256 return d
248 257
249 258
250 259 def sleep_deferred(seconds):
251 260 """Sleep without blocking the event loop."""
252 261 d = defer.Deferred()
253 262 reactor.callLater(seconds, d.callback, seconds)
254 263 return d
255 264
256 265
257 266 def make_deferred(func):
258 267 """A decorator that calls a function with :func`maybeDeferred`."""
259 268
260 269 def _wrapper(*args, **kwargs):
261 270 return defer.maybeDeferred(func, *args, **kwargs)
262 271
263 return _wrapper No newline at end of file
272 return _wrapper
273
274
General Comments 0
You need to be logged in to leave comments. Login now